Skip to main content

mito2/sst/parquet/
flat_format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! It can store both encoded primary key and raw key columns.
18//!
19//! We store two additional internal columns at last:
20//! - `__primary_key`, the encoded primary key of the row (tags). Type: dictionary(uint32, binary)
21//! - `__sequence`, the sequence number of a row. Type: uint64
22//! - `__op_type`, the op type of the row. Type: uint8
23//!
24//! The format is
25//! ```text
26//! primary key columns, field columns, time index, encoded primary key, __sequence, __op_type.
27//!
28//! It stores field columns in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns())
29//! and stores primary key columns in the same order as [RegionMetadata::primary_key].
30
31use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use arrow_schema::{DataType as ArrowDataType, FieldRef};
37use datatypes::arrow::array::{
38    Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
39};
40use datatypes::arrow::compute::kernels::take::take;
41use datatypes::arrow::datatypes::{Schema, SchemaRef};
42use datatypes::arrow::record_batch::RecordBatch;
43use datatypes::prelude::{ConcreteDataType, DataType};
44use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
45use parquet::file::metadata::RowGroupMetaData;
46use snafu::{OptionExt, ResultExt, ensure};
47use store_api::codec::PrimaryKeyEncoding;
48use store_api::metadata::{RegionMetadata, RegionMetadataRef};
49use store_api::storage::{ColumnId, NestedPath, SequenceNumber};
50
51use crate::error::{
52    ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
53    NewRecordBatchSnafu, Result,
54};
55use crate::read::read_columns::ReadColumns;
56use crate::sst::parquet::format::{
57    FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray,
58    PrimaryKeyReadFormat, StatValues, column_null_counts, column_values,
59};
60use crate::sst::parquet::read_columns::ParquetReadColumns;
61use crate::sst::{
62    FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
63    to_flat_sst_arrow_schema, with_field_id,
64};
65
66/// Helper for writing the SST format.
67pub(crate) struct FlatWriteFormat {
68    /// SST file schema.
69    arrow_schema: SchemaRef,
70    override_sequence: Option<SequenceNumber>,
71}
72
73impl FlatWriteFormat {
74    /// Creates a new helper.
75    pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
76        let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
77        FlatWriteFormat {
78            arrow_schema,
79            override_sequence: None,
80        }
81    }
82
83    /// Set override sequence.
84    pub(crate) fn with_override_sequence(
85        mut self,
86        override_sequence: Option<SequenceNumber>,
87    ) -> Self {
88        self.override_sequence = override_sequence;
89        self
90    }
91
92    /// Gets the arrow schema to store in parquet.
93    #[cfg(test)]
94    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
95        &self.arrow_schema
96    }
97
98    /// Convert `batch` to a arrow record batch to store in parquet.
99    pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
100        debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
101
102        let Some(override_sequence) = self.override_sequence else {
103            return Ok(batch.clone());
104        };
105
106        let mut columns = batch.columns().to_vec();
107        let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
108        columns[sequence_column_index(batch.num_columns())] = sequence_array;
109
110        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
111    }
112}
113
114/// Returns the position of the sequence column.
115pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
116    num_columns - 2
117}
118
119/// Returns the position of the time index column.
120pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
121    num_columns - 4
122}
123
124/// Returns the position of the primary key column.
125pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
126    num_columns - 3
127}
128
129/// Wraps the `__primary_key` `BinaryArray` back into a `DictionaryArray<UInt32, Binary>` with identity keys.
130pub(crate) fn wrap_pk_binary_to_dict(
131    record_batch: RecordBatch,
132    dict_schema: &SchemaRef,
133) -> Result<RecordBatch> {
134    let pk_idx = primary_key_column_index(record_batch.num_columns());
135    let pk_column = record_batch.column(pk_idx);
136    let binary_array = pk_column
137        .as_any()
138        .downcast_ref::<BinaryArray>()
139        .with_context(|| InvalidRecordBatchSnafu {
140            reason: format!(
141                "expected BinaryArray for __primary_key, got {:?}",
142                pk_column.data_type()
143            ),
144        })?;
145    let n = binary_array.len();
146    let keys = UInt32Array::from_iter_values(0..n as u32);
147    let dict_array: ArrayRef = Arc::new(DictionaryArray::new(keys, pk_column.clone()));
148
149    let mut columns = record_batch.columns().to_vec();
150    columns[pk_idx] = dict_array;
151
152    RecordBatch::try_new(dict_schema.clone(), columns).context(NewRecordBatchSnafu)
153}
154
155/// Returns the position of the op type key column.
156pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
157    num_columns - 1
158}
159
160/// Returns the start index of field columns in a flat batch.
161///
162/// `num_columns` is the total number of columns in the flat batch schema,
163/// including tag columns (if present), field columns, and fixed position columns
164/// (time index, primary key, sequence, op type).
165///
166/// For Dense encoding (raw PK columns included): field_column_start = primary_key.len()
167/// For Sparse encoding (no raw PK columns): field_column_start = 0
168pub(crate) fn field_column_start(metadata: &RegionMetadata, num_columns: usize) -> usize {
169    // Calculates field column start: total columns - fixed columns - field columns
170    // Field column count = total metadata columns - time index column - primary key columns
171    let field_column_count = metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
172    num_columns - FIXED_POS_COLUMN_NUM - field_column_count
173}
174
175// TODO(yingwen): Add an option to skip reading internal columns if the region is
176// append only and doesn't use sparse encoding (We need to check the table id under
177// sparse encoding).
178/// Helper for reading the flat SST format with projection.
179///
180/// It only supports flat format that stores primary keys additionally.
181pub struct FlatReadFormat {
182    /// Sequence number to override the sequence read from the SST.
183    override_sequence: Option<SequenceNumber>,
184    /// Parquet format adapter.
185    parquet_adapter: ParquetAdapter,
186    /// Output schema to wrap binary `__primary_key` back to a dictionary; `None` disables wrapping.
187    pk_dict_wrap_schema: Option<SchemaRef>,
188}
189
190impl FlatReadFormat {
191    /// Creates a helper with existing `metadata` and `column_ids` to read.
192    ///
193    /// If `skip_auto_convert` is true, skips auto conversion of format when the encoding is sparse encoding.
194    pub fn new(
195        metadata: RegionMetadataRef,
196        read_cols: ReadColumns,
197        file_schema: Option<SchemaRef>,
198        file_path: &str,
199        skip_auto_convert: bool,
200    ) -> Result<FlatReadFormat> {
201        let num_columns = file_schema.as_ref().map(|x| x.fields().len());
202        let is_legacy = match num_columns {
203            Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
204            None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
205        };
206
207        let parquet_adapter = if is_legacy {
208            // Safety: is_legacy_format() ensures primary_key is not empty.
209            if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse {
210                // Only skip auto convert when the primary key encoding is sparse.
211                ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
212                    metadata,
213                    read_cols,
214                    skip_auto_convert,
215                ))
216            } else {
217                ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
218                    metadata, read_cols, false,
219                ))
220            }
221        } else {
222            let file_schema = file_schema
223                .unwrap_or_else(|| to_flat_sst_arrow_schema(&metadata, &Default::default()));
224            ParquetAdapter::Flat(ParquetFlat::new(metadata, read_cols, file_schema))
225        };
226
227        Ok(FlatReadFormat {
228            override_sequence: None,
229            parquet_adapter,
230            pk_dict_wrap_schema: None,
231        })
232    }
233
234    /// Sets the sequence number to override.
235    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
236        self.override_sequence = sequence;
237    }
238
239    /// Enables wrapping binary `__primary_key` batches back to a dictionary in [`Self::convert_batch`].
240    pub(crate) fn set_pk_as_binary(&mut self) -> Result<()> {
241        self.pk_dict_wrap_schema = Some(self.output_arrow_schema()?);
242        Ok(())
243    }
244
245    /// Index of a column in the projected batch by its column id.
246    pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
247        self.format_projection()
248            .column_id_to_projected_index
249            .get(&column_id)
250            .copied()
251    }
252
253    /// Returns min values of specific column in row groups.
254    pub fn min_values(
255        &self,
256        row_groups: &[impl Borrow<RowGroupMetaData>],
257        column_id: ColumnId,
258    ) -> StatValues {
259        match &self.parquet_adapter {
260            ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
261            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
262        }
263    }
264
265    /// Returns max values of specific column in row groups.
266    pub fn max_values(
267        &self,
268        row_groups: &[impl Borrow<RowGroupMetaData>],
269        column_id: ColumnId,
270    ) -> StatValues {
271        match &self.parquet_adapter {
272            ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
273            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
274        }
275    }
276
277    /// Returns null counts of specific column in row groups.
278    pub fn null_counts(
279        &self,
280        row_groups: &[impl Borrow<RowGroupMetaData>],
281        column_id: ColumnId,
282    ) -> StatValues {
283        match &self.parquet_adapter {
284            ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
285            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
286        }
287    }
288
289    /// Gets the arrow schema of the SST file.
290    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
291        match &self.parquet_adapter {
292            ParquetAdapter::Flat(p) => &p.arrow_schema,
293            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
294        }
295    }
296
297    /// Gets the projected output schema produced by parquet reading.
298    pub(crate) fn output_arrow_schema(&self) -> Result<SchemaRef> {
299        let read_columns = self.parquet_read_columns();
300        let projection = read_columns.root_indices();
301        let mut schema = self
302            .arrow_schema()
303            .project(projection)
304            .context(ComputeArrowSnafu)?;
305        if read_columns.has_nested() {
306            debug_assert_eq!(schema.fields().len(), read_columns.columns().len());
307            let nested_paths = read_columns.columns().iter().map(|x| x.nested_paths());
308            prune_schema_by_nested_paths(&mut schema, nested_paths);
309        }
310        Ok(Arc::new(schema))
311    }
312
313    /// Gets the metadata of the SST.
314    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
315        match &self.parquet_adapter {
316            ParquetAdapter::Flat(p) => &p.metadata,
317            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
318        }
319    }
320
321    /// Get the sorted read columns to read from the sst file.
322    pub(crate) fn parquet_read_columns(&self) -> &ParquetReadColumns {
323        match &self.parquet_adapter {
324            ParquetAdapter::Flat(p) => &p.format_projection.parquet_read_cols,
325            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.parquet_read_columns(),
326        }
327    }
328
329    /// Gets the projection in the flat format.
330    ///
331    /// When `skip_auto_convert` is enabled (primary-key format read), this returns the
332    /// primary-key format projection so filter/prune can resolve projected indices.
333    pub(crate) fn format_projection(&self) -> &FormatProjection {
334        match &self.parquet_adapter {
335            ParquetAdapter::Flat(p) => &p.format_projection,
336            ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
337        }
338    }
339
340    /// Returns `true` if raw batches from parquet use the flat layout and
341    /// stores primary key columns as raw columns.
342    /// Returns `false` for the legacy primary-key-to-flat conversion path.
343    pub(crate) fn batch_has_raw_pk_columns(&self) -> bool {
344        matches!(&self.parquet_adapter, ParquetAdapter::Flat(_))
345    }
346
347    /// Creates a sequence array to override.
348    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
349        self.override_sequence
350            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
351    }
352
353    /// Convert a record batch to apply flat format conversion and override sequence array.
354    ///
355    /// Returns a new RecordBatch with flat format conversion applied first (if enabled),
356    /// then the sequence column replaced by the override sequence array.
357    pub(crate) fn convert_batch(
358        &self,
359        record_batch: RecordBatch,
360        override_sequence_array: Option<&ArrayRef>,
361    ) -> Result<RecordBatch> {
362        let record_batch = if let Some(dict_schema) = &self.pk_dict_wrap_schema {
363            wrap_pk_binary_to_dict(record_batch, dict_schema)?
364        } else {
365            record_batch
366        };
367
368        // First, apply flat format conversion.
369        let batch = match &self.parquet_adapter {
370            ParquetAdapter::Flat(_) => record_batch,
371            ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
372        };
373
374        // Then apply sequence override if provided
375        let Some(override_array) = override_sequence_array else {
376            return Ok(batch);
377        };
378
379        let mut columns = batch.columns().to_vec();
380        let sequence_column_idx = sequence_column_index(batch.num_columns());
381
382        // Use the provided override sequence array, slicing if necessary to match batch length
383        let sequence_array = if override_array.len() > batch.num_rows() {
384            override_array.slice(0, batch.num_rows())
385        } else {
386            override_array.clone()
387        };
388
389        columns[sequence_column_idx] = sequence_array;
390
391        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
392    }
393
394    /// Checks whether the batch from the parquet file needs to be converted to match the flat format.
395    ///
396    /// * `metadata` is the region metadata (always assumes flat format).
397    /// * `num_columns` is the number of columns in the parquet file.
398    /// * `file_path` is the path to the parquet file, for error message.
399    pub(crate) fn is_legacy_format(
400        metadata: &RegionMetadata,
401        num_columns: usize,
402        file_path: &str,
403    ) -> Result<bool> {
404        if metadata.primary_key.is_empty() {
405            return Ok(false);
406        }
407
408        // For flat format, compute expected column number:
409        // all columns + internal columns (pk, sequence, op_type)
410        let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
411
412        if expected_columns == num_columns {
413            // Same number of columns, no conversion needed
414            Ok(false)
415        } else {
416            ensure!(
417                expected_columns >= num_columns,
418                InvalidParquetSnafu {
419                    file: file_path,
420                    reason: format!(
421                        "Expected columns {} should be >= actual columns {}",
422                        expected_columns, num_columns
423                    )
424                }
425            );
426
427            // Different number of columns, check if the difference matches primary key count
428            let column_diff = expected_columns - num_columns;
429
430            ensure!(
431                column_diff == metadata.primary_key.len(),
432                InvalidParquetSnafu {
433                    file: file_path,
434                    reason: format!(
435                        "Column number difference {} does not match primary key count {}",
436                        column_diff,
437                        metadata.primary_key.len()
438                    )
439                }
440            );
441
442            Ok(true)
443        }
444    }
445}
446
447fn prune_schema_by_nested_paths<'a, I>(schema: &mut Schema, nested_paths: I)
448where
449    I: IntoIterator<Item = &'a [NestedPath]>,
450{
451    let fields = schema
452        .fields
453        .into_iter()
454        .zip(nested_paths)
455        .map(|(field, paths)| {
456            if matches!(field.data_type(), ArrowDataType::Struct(_)) && !paths.is_empty() {
457                let child_paths = paths
458                    .iter()
459                    .map(|path| {
460                        if path.first().is_some_and(|root| root == field.name()) {
461                            &path[1..]
462                        } else {
463                            path
464                        }
465                    })
466                    .collect::<Vec<_>>();
467                prune_field_by_nested_paths(field, &child_paths)
468            } else {
469                field.clone()
470            }
471        })
472        .collect::<Vec<_>>();
473    schema.fields = fields.into()
474}
475
476fn prune_field_by_nested_paths(field: &FieldRef, nested_paths: &[&[String]]) -> FieldRef {
477    let ArrowDataType::Struct(fields) = field.data_type() else {
478        return field.clone();
479    };
480
481    let pruned_fields = fields
482        .iter()
483        .filter_map(|field| {
484            let child_paths = nested_paths
485                .iter()
486                .filter_map(|path| {
487                    path.first()
488                        .is_some_and(|name| name == field.name())
489                        .then_some(&path[1..])
490                })
491                .collect::<Vec<_>>();
492
493            if child_paths.is_empty() {
494                None
495            } else if child_paths.iter().any(|path| path.is_empty()) {
496                Some(field.clone())
497            } else {
498                Some(prune_field_by_nested_paths(field, &child_paths))
499            }
500        })
501        .collect::<Vec<_>>();
502
503    Arc::new(
504        field
505            .as_ref()
506            .clone()
507            .with_data_type(ArrowDataType::Struct(pruned_fields.into())),
508    )
509}
510
511/// Wraps the parquet helper for different formats.
512enum ParquetAdapter {
513    Flat(ParquetFlat),
514    PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
515}
516
517/// Helper to reads the parquet from primary key format into the flat format.
518struct ParquetPrimaryKeyToFlat {
519    /// The primary key format to read the parquet.
520    format: PrimaryKeyReadFormat,
521    /// Format converter for handling flat format conversion.
522    convert_format: Option<FlatConvertFormat>,
523    /// Projection computed for the flat format.
524    format_projection: FormatProjection,
525}
526
527impl ParquetPrimaryKeyToFlat {
528    /// Creates a helper with existing `metadata` and `column_ids` to read.
529    fn new(
530        metadata: RegionMetadataRef,
531        read_cols: ReadColumns,
532        skip_auto_convert: bool,
533    ) -> ParquetPrimaryKeyToFlat {
534        assert!(if skip_auto_convert {
535            metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse
536        } else {
537            true
538        });
539
540        // Creates a map to lookup index based on the new format.
541        let id_to_index = sst_column_id_indices(&metadata);
542        let sst_column_num =
543            flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
544
545        let codec = build_primary_key_codec(&metadata);
546        let format = PrimaryKeyReadFormat::new(metadata.clone(), read_cols.clone());
547        let (convert_format, format_projection) = if skip_auto_convert {
548            (
549                None,
550                FormatProjection {
551                    parquet_read_cols: format.parquet_read_columns().clone(),
552                    column_id_to_projected_index: format.field_id_to_projected_index().clone(),
553                },
554            )
555        } else {
556            // Computes the format projection for the new format.
557            let format_projection = FormatProjection::compute_format_projection(
558                &id_to_index,
559                sst_column_num,
560                read_cols.clone(),
561            );
562            (
563                FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec),
564                format_projection,
565            )
566        };
567
568        Self {
569            format,
570            convert_format,
571            format_projection,
572        }
573    }
574
575    fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
576        if let Some(convert_format) = &self.convert_format {
577            convert_format.convert(record_batch)
578        } else {
579            Ok(record_batch)
580        }
581    }
582}
583
584/// Helper to reads the parquet in flat format directly.
585struct ParquetFlat {
586    /// The metadata stored in the SST.
587    metadata: RegionMetadataRef,
588    /// SST file schema.
589    arrow_schema: SchemaRef,
590    /// Projection computed for the flat format.
591    format_projection: FormatProjection,
592    /// Column id to index in SST.
593    column_id_to_sst_index: HashMap<ColumnId, usize>,
594}
595
596impl ParquetFlat {
597    /// Creates a helper with existing `metadata` and `column_ids` to read.
598    fn new(
599        metadata: RegionMetadataRef,
600        read_cols: ReadColumns,
601        arrow_schema: SchemaRef,
602    ) -> ParquetFlat {
603        // Creates a map to lookup index.
604        let id_to_index = sst_column_id_indices(&metadata);
605        let sst_column_num =
606            flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
607        let format_projection =
608            FormatProjection::compute_format_projection(&id_to_index, sst_column_num, read_cols);
609
610        Self {
611            metadata,
612            arrow_schema,
613            format_projection,
614            column_id_to_sst_index: id_to_index,
615        }
616    }
617
618    /// Returns min values of specific column in row groups.
619    fn min_values(
620        &self,
621        row_groups: &[impl Borrow<RowGroupMetaData>],
622        column_id: ColumnId,
623    ) -> StatValues {
624        self.get_stat_values(row_groups, column_id, true)
625    }
626
627    /// Returns max values of specific column in row groups.
628    fn max_values(
629        &self,
630        row_groups: &[impl Borrow<RowGroupMetaData>],
631        column_id: ColumnId,
632    ) -> StatValues {
633        self.get_stat_values(row_groups, column_id, false)
634    }
635
636    /// Returns null counts of specific column in row groups.
637    fn null_counts(
638        &self,
639        row_groups: &[impl Borrow<RowGroupMetaData>],
640        column_id: ColumnId,
641    ) -> StatValues {
642        let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
643            // No such column in the SST.
644            return StatValues::NoColumn;
645        };
646
647        let stats = column_null_counts(row_groups, *index);
648        StatValues::from_stats_opt(stats)
649    }
650
651    fn get_stat_values(
652        &self,
653        row_groups: &[impl Borrow<RowGroupMetaData>],
654        column_id: ColumnId,
655        is_min: bool,
656    ) -> StatValues {
657        let Some(column) = self.metadata.column_by_id(column_id) else {
658            // No such column in the SST.
659            return StatValues::NoColumn;
660        };
661        // Safety: `column_id_to_sst_index` is built from `metadata`.
662        let index = self.column_id_to_sst_index.get(&column_id).unwrap();
663
664        let stats = column_values(row_groups, column, *index, is_min);
665        StatValues::from_stats_opt(stats)
666    }
667}
668
669/// Returns a map that the key is the column id and the value is the column position
670/// in the SST.
671/// It only supports SSTs with raw primary key columns.
672pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
673    let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
674    let mut column_index = 0;
675    // keys
676    for pk_id in &metadata.primary_key {
677        id_to_index.insert(*pk_id, column_index);
678        column_index += 1;
679    }
680    // fields
681    for column in &metadata.column_metadatas {
682        if column.semantic_type == SemanticType::Field {
683            id_to_index.insert(column.column_id, column_index);
684            column_index += 1;
685        }
686    }
687    // time index
688    id_to_index.insert(metadata.time_index_column().column_id, column_index);
689
690    id_to_index
691}
692
693/// Decodes primary keys from a batch and returns decoded primary key information.
694///
695/// The batch must contain a primary key column at the expected index.
696pub(crate) fn decode_primary_keys(
697    codec: &dyn PrimaryKeyCodec,
698    batch: &RecordBatch,
699) -> Result<DecodedPrimaryKeys> {
700    let primary_key_index = primary_key_column_index(batch.num_columns());
701    let pk_dict_array = batch
702        .column(primary_key_index)
703        .as_any()
704        .downcast_ref::<PrimaryKeyArray>()
705        .with_context(|| InvalidRecordBatchSnafu {
706            reason: "Primary key column is not a dictionary array".to_string(),
707        })?;
708    let pk_values_array = pk_dict_array
709        .values()
710        .as_any()
711        .downcast_ref::<BinaryArray>()
712        .with_context(|| InvalidRecordBatchSnafu {
713            reason: "Primary key values are not binary array".to_string(),
714        })?;
715
716    let keys = pk_dict_array.keys();
717
718    // Decodes primary key values by iterating through keys, reusing decoded values for duplicate keys.
719    // Maps original key index -> new decoded value index
720    let mut key_to_decoded_index = Vec::with_capacity(keys.len());
721    let mut decoded_pk_values = Vec::new();
722    let mut prev_key: Option<u32> = None;
723
724    // The parquet reader may read the whole dictionary page into the dictionary values, so
725    // we may decode many primary keys not in this batch if we decode the values array directly.
726    let pk_indices = keys.values();
727    for &current_key in pk_indices.iter().take(keys.len()) {
728        // Check if current key is the same as previous key
729        if let Some(prev) = prev_key
730            && prev == current_key
731        {
732            // Reuse the last decoded index
733            key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
734            continue;
735        }
736
737        // New key, decodes the value
738        let pk_bytes = pk_values_array.value(current_key as usize);
739        let decoded_value = codec.decode(pk_bytes).context(DecodeSnafu)?;
740
741        decoded_pk_values.push(decoded_value);
742        key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
743        prev_key = Some(current_key);
744    }
745
746    // Create the keys array from key_to_decoded_index
747    let keys_array = UInt32Array::from(key_to_decoded_index);
748
749    Ok(DecodedPrimaryKeys {
750        decoded_pk_values,
751        keys_array,
752    })
753}
754
755/// Holds decoded primary key values and their indices.
756pub(crate) struct DecodedPrimaryKeys {
757    /// Decoded primary key values for unique keys in the dictionary.
758    decoded_pk_values: Vec<CompositeValues>,
759    /// Prebuilt keys array for creating dictionary arrays.
760    keys_array: UInt32Array,
761}
762
763impl DecodedPrimaryKeys {
764    /// Gets a tag column array by column id and data type.
765    ///
766    /// For sparse encoding, uses column_id to lookup values.
767    /// For dense encoding, uses pk_index to get values.
768    pub(crate) fn get_tag_column(
769        &self,
770        column_id: ColumnId,
771        pk_index: Option<usize>,
772        column_type: &ConcreteDataType,
773    ) -> Result<ArrayRef> {
774        // Gets values from the primary key.
775        let mut builder = column_type.create_mutable_vector(self.decoded_pk_values.len());
776        for decoded in &self.decoded_pk_values {
777            match decoded {
778                CompositeValues::Dense(dense) => {
779                    let pk_idx = pk_index.expect("pk_index required for dense encoding");
780                    if pk_idx < dense.len() {
781                        builder.push_value_ref(&dense[pk_idx].1.as_value_ref());
782                    } else {
783                        builder.push_null();
784                    }
785                }
786                CompositeValues::Sparse(sparse) => {
787                    let value = sparse.get_or_null(column_id);
788                    builder.push_value_ref(&value.as_value_ref());
789                }
790            };
791        }
792
793        let values_vector = builder.to_vector();
794        let values_array = values_vector.to_arrow_array();
795
796        // Only creates dictionary array for string types, otherwise take values by keys
797        if column_type.is_string() {
798            // Creates dictionary array using the same keys for string types
799            // Note that the dictionary values may have nulls.
800            let dict_array = DictionaryArray::new(self.keys_array.clone(), values_array);
801            Ok(Arc::new(dict_array))
802        } else {
803            // For non-string types, takes values by keys indices to create a regular array
804            let taken_array =
805                take(&values_array, &self.keys_array, None).context(ComputeArrowSnafu)?;
806            Ok(taken_array)
807        }
808    }
809}
810
811/// Converts a batch that doesn't have decoded primary key columns into a batch that has decoded
812/// primary key columns in flat format.
813pub(crate) struct FlatConvertFormat {
814    /// Metadata of the region.
815    metadata: RegionMetadataRef,
816    /// Primary key codec to decode primary keys.
817    codec: Arc<dyn PrimaryKeyCodec>,
818    /// Projected primary key column information: (column_id, pk_index, column_index in metadata).
819    projected_primary_keys: Vec<(ColumnId, usize, usize)>,
820}
821
822impl FlatConvertFormat {
823    /// Creates a new `FlatConvertFormat`.
824    ///
825    /// The `format_projection` is the projection computed in the [FlatReadFormat] with the `metadata`.
826    /// The `codec` is the primary key codec of the `metadata`.
827    ///
828    /// Returns `None` if there is no primary key.
829    pub(crate) fn new(
830        metadata: RegionMetadataRef,
831        format_projection: &FormatProjection,
832        codec: Arc<dyn PrimaryKeyCodec>,
833    ) -> Option<Self> {
834        if metadata.primary_key.is_empty() {
835            return None;
836        }
837
838        // Builds projected primary keys list maintaining the order of RegionMetadata::primary_key
839        let mut projected_primary_keys = Vec::new();
840        for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
841            if format_projection
842                .column_id_to_projected_index
843                .contains_key(&column_id)
844            {
845                // We expect the format_projection is built from the metadata.
846                let column_index = metadata.column_index_by_id(column_id).unwrap();
847                projected_primary_keys.push((column_id, pk_index, column_index));
848            }
849        }
850
851        Some(Self {
852            metadata,
853            codec,
854            projected_primary_keys,
855        })
856    }
857
858    /// Converts a batch to have decoded primary key columns in flat format.
859    ///
860    /// The primary key array in the batch is a dictionary array.
861    pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
862        if self.projected_primary_keys.is_empty() {
863            return Ok(batch);
864        }
865
866        let decoded_pks = decode_primary_keys(self.codec.as_ref(), &batch)?;
867
868        // Builds decoded tag column arrays.
869        let mut decoded_columns = Vec::new();
870        for (column_id, pk_index, column_index) in &self.projected_primary_keys {
871            let column_metadata = &self.metadata.column_metadatas[*column_index];
872            let tag_column = decoded_pks.get_tag_column(
873                *column_id,
874                Some(*pk_index),
875                &column_metadata.column_schema.data_type,
876            )?;
877            decoded_columns.push(tag_column);
878        }
879
880        // Builds new columns: decoded tag columns first, then original columns
881        let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
882        new_columns.extend(decoded_columns);
883        new_columns.extend_from_slice(batch.columns());
884
885        // Builds new schema
886        let mut new_fields =
887            Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
888        for (column_id, _, column_index) in &self.projected_primary_keys {
889            let column_metadata = &self.metadata.column_metadatas[*column_index];
890            let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
891            let field =
892                tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
893            new_fields.push(Arc::new(with_field_id((*field).clone(), *column_id)));
894        }
895        new_fields.extend(batch.schema().fields().iter().cloned());
896
897        let new_schema = Arc::new(Schema::new(new_fields));
898        RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
899    }
900}
901
902#[cfg(test)]
903impl FlatReadFormat {
904    /// Creates a helper with existing `metadata` and all columns.
905    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
906        Self::new(
907            Arc::clone(&metadata),
908            ReadColumns::from_deduped_column_ids(
909                metadata.column_metadatas.iter().map(|c| c.column_id),
910            ),
911            None,
912            "test",
913            false,
914        )
915        .unwrap()
916    }
917}
918
919#[cfg(test)]
920mod tests {
921    use std::sync::Arc;
922
923    use api::v1::SemanticType;
924    use arrow_schema::Field;
925    use datatypes::arrow::array::{
926        ArrayRef, BinaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
927    };
928    use datatypes::arrow::datatypes::DataType as ArrowDataType;
929    use datatypes::arrow::record_batch::RecordBatch;
930    use datatypes::prelude::ConcreteDataType;
931    use datatypes::schema::ColumnSchema;
932    use store_api::codec::PrimaryKeyEncoding;
933    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
934    use store_api::storage::RegionId;
935    use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
936
937    use super::*;
938    use crate::read::read_columns::ReadColumns;
939    use crate::sst::{
940        FlatSchemaOptions, PARQUET_FIELD_ID_KEY, PRIMARY_KEY_PARQUET_FIELD_ID,
941        flat_sst_arrow_schema_column_num, override_pk_field_to_binary, to_flat_sst_arrow_schema,
942    };
943
944    /// Builds a `RegionMetadata` with the given number of tags and fields.
945    fn build_metadata(
946        num_tags: usize,
947        num_fields: usize,
948        encoding: PrimaryKeyEncoding,
949    ) -> RegionMetadata {
950        let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
951        let mut col_id = 0u32;
952
953        for i in 0..num_tags {
954            builder.push_column_metadata(ColumnMetadata {
955                column_schema: ColumnSchema::new(
956                    format!("tag_{i}"),
957                    ConcreteDataType::string_datatype(),
958                    true,
959                ),
960                semantic_type: SemanticType::Tag,
961                column_id: col_id,
962            });
963            col_id += 1;
964        }
965
966        for i in 0..num_fields {
967            builder.push_column_metadata(ColumnMetadata {
968                column_schema: ColumnSchema::new(
969                    format!("field_{i}"),
970                    ConcreteDataType::uint64_datatype(),
971                    true,
972                ),
973                semantic_type: SemanticType::Field,
974                column_id: col_id,
975            });
976            col_id += 1;
977        }
978
979        builder.push_column_metadata(ColumnMetadata {
980            column_schema: ColumnSchema::new(
981                "ts".to_string(),
982                ConcreteDataType::timestamp_millisecond_datatype(),
983                false,
984            ),
985            semantic_type: SemanticType::Timestamp,
986            column_id: col_id,
987        });
988
989        let primary_key: Vec<u32> = (0..num_tags as u32).collect();
990        builder.primary_key(primary_key);
991        builder.primary_key_encoding(encoding);
992        builder.build().unwrap()
993    }
994
995    #[test]
996    fn test_field_column_start() {
997        // (num_tags, num_fields, encoding, expected)
998        let cases = [
999            (1, 1, PrimaryKeyEncoding::Dense, 1),
1000            (2, 2, PrimaryKeyEncoding::Dense, 2),
1001            (0, 2, PrimaryKeyEncoding::Dense, 0),
1002            (2, 2, PrimaryKeyEncoding::Sparse, 0),
1003        ];
1004
1005        for (num_tags, num_fields, encoding, expected) in cases {
1006            let metadata = build_metadata(num_tags, num_fields, encoding);
1007            let options = FlatSchemaOptions::from_encoding(encoding);
1008            let num_columns = flat_sst_arrow_schema_column_num(&metadata, &options);
1009            let result = field_column_start(&metadata, num_columns);
1010            assert_eq!(
1011                result, expected,
1012                "num_tags={num_tags}, num_fields={num_fields}, encoding={encoding:?}"
1013            );
1014        }
1015    }
1016
1017    #[test]
1018    fn test_convert_batch_wraps_binary_pk_to_dict() {
1019        use datatypes::arrow::array::{Array, DictionaryArray, StringArray};
1020        use datatypes::arrow::datatypes::UInt32Type;
1021
1022        // build_metadata(1, 1, Dense) projects to:
1023        // [tag_0: Dict<UInt32, Utf8>, field_0: UInt64, ts: Timestamp(ms),
1024        //  __primary_key: Dict<UInt32, Binary>, __sequence: UInt64, __op_type: UInt8]
1025        let metadata = Arc::new(build_metadata(1, 1, PrimaryKeyEncoding::Dense));
1026        let column_ids: Vec<u32> = metadata
1027            .column_metadatas
1028            .iter()
1029            .map(|c| c.column_id)
1030            .collect();
1031        let mut read_format = FlatReadFormat::new(
1032            metadata.clone(),
1033            ReadColumns::from_deduped_column_ids(column_ids),
1034            None,
1035            "test",
1036            false,
1037        )
1038        .unwrap();
1039        read_format.set_pk_as_binary().unwrap();
1040
1041        let output_schema = read_format.output_arrow_schema().unwrap();
1042        let binary_schema = override_pk_field_to_binary(&output_schema);
1043
1044        // The __primary_key field must preserve its field_id metadata after
1045        // being converted from dictionary to plain binary.
1046        let pk_field = binary_schema
1047            .field_with_name(PRIMARY_KEY_COLUMN_NAME)
1048            .unwrap();
1049        assert_eq!(
1050            pk_field.metadata().get(PARQUET_FIELD_ID_KEY),
1051            Some(&PRIMARY_KEY_PARQUET_FIELD_ID.to_string()),
1052            "__primary_key field must retain its PARQUET:field_id after override_pk_field_to_binary"
1053        );
1054
1055        // Repeat the second pk to verify identity keys (no dedup).
1056        let tag_keys = UInt32Array::from(vec![0u32, 1, 1]);
1057        let tag_values = Arc::new(StringArray::from(vec!["t0", "t1"]));
1058        let tag_array: ArrayRef =
1059            Arc::new(DictionaryArray::<UInt32Type>::new(tag_keys, tag_values));
1060        let field_array: ArrayRef = Arc::new(UInt64Array::from(vec![10u64, 11, 12]));
1061        let ts_array: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![1i64, 2, 3]));
1062        let pk_array: ArrayRef = Arc::new(BinaryArray::from_iter_values(
1063            [b"alpha".as_ref(), b"beta", b"beta"].iter().copied(),
1064        ));
1065        let seq_array: ArrayRef = Arc::new(UInt64Array::from(vec![100u64, 101, 102]));
1066        let op_array: ArrayRef = Arc::new(UInt8Array::from(vec![1u8, 1, 1]));
1067
1068        let batch = RecordBatch::try_new(
1069            binary_schema,
1070            vec![
1071                tag_array,
1072                field_array,
1073                ts_array,
1074                pk_array,
1075                seq_array,
1076                op_array,
1077            ],
1078        )
1079        .unwrap();
1080
1081        let wrapped = read_format.convert_batch(batch, None).unwrap();
1082        assert_eq!(wrapped.schema(), output_schema);
1083
1084        let pk_idx = primary_key_column_index(wrapped.num_columns());
1085        let pk_col = wrapped.column(pk_idx);
1086        assert_eq!(
1087            pk_col.data_type(),
1088            &ArrowDataType::Dictionary(
1089                Box::new(ArrowDataType::UInt32),
1090                Box::new(ArrowDataType::Binary)
1091            )
1092        );
1093        let dict = pk_col
1094            .as_any()
1095            .downcast_ref::<DictionaryArray<UInt32Type>>()
1096            .unwrap();
1097        assert_eq!(dict.keys().values(), &[0, 1, 2]);
1098        let values = dict
1099            .values()
1100            .as_any()
1101            .downcast_ref::<BinaryArray>()
1102            .unwrap();
1103        assert_eq!(values.value(0), b"alpha");
1104        assert_eq!(values.value(1), b"beta");
1105        assert_eq!(values.value(2), b"beta");
1106    }
1107
1108    #[test]
1109    fn test_output_arrow_schema_uses_projection() {
1110        let metadata = Arc::new(build_metadata(1, 2, PrimaryKeyEncoding::Dense));
1111        let read_format = FlatReadFormat::new(
1112            metadata.clone(),
1113            ReadColumns::from_deduped_column_ids([0_u32, 2_u32]),
1114            None,
1115            "test",
1116            false,
1117        )
1118        .unwrap();
1119
1120        let output_schema = read_format.output_arrow_schema().unwrap();
1121        let projection = read_format.parquet_read_columns().root_indices();
1122        let expected = Arc::new(
1123            to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default())
1124                .project(projection)
1125                .unwrap(),
1126        );
1127
1128        assert_eq!(expected, output_schema);
1129    }
1130
1131    #[test]
1132    fn test_prune_schema_by_nested_paths() {
1133        fn new_field(name: &str, data_type: ArrowDataType) -> FieldRef {
1134            Arc::new(Field::new(name, data_type, true))
1135        }
1136
1137        fn struct_field(name: &str, fields: impl IntoIterator<Item = FieldRef>) -> FieldRef {
1138            new_field(name, ArrowDataType::Struct(fields.into_iter().collect()))
1139        }
1140
1141        let mut schema = Schema::new([
1142            struct_field(
1143                "j",
1144                [
1145                    struct_field(
1146                        "a",
1147                        [
1148                            new_field("x", ArrowDataType::Int64),
1149                            new_field("y", ArrowDataType::Utf8),
1150                            struct_field(
1151                                "z",
1152                                [
1153                                    new_field("q", ArrowDataType::Boolean),
1154                                    new_field("r", ArrowDataType::Float64),
1155                                ],
1156                            ),
1157                        ],
1158                    ),
1159                    new_field("b", ArrowDataType::Utf8),
1160                    struct_field(
1161                        "c",
1162                        vec![
1163                            new_field("d", ArrowDataType::Int64),
1164                            new_field("e", ArrowDataType::Utf8),
1165                        ],
1166                    ),
1167                ],
1168            ),
1169            new_field("tag", ArrowDataType::Utf8),
1170            struct_field(
1171                "k",
1172                [
1173                    new_field("k_0", ArrowDataType::Int64),
1174                    new_field("k_1", ArrowDataType::Utf8),
1175                ],
1176            ),
1177        ]);
1178
1179        let nested_paths = [
1180            vec![
1181                ["j", "a", "x"].iter().map(|x| x.to_string()).collect(),
1182                ["j", "a", "z", "q"].iter().map(|x| x.to_string()).collect(),
1183                ["j", "c"].iter().map(|x| x.to_string()).collect(),
1184            ],
1185            vec![],
1186            vec![],
1187        ];
1188
1189        prune_schema_by_nested_paths(
1190            &mut schema,
1191            nested_paths.iter().map(|paths| paths.as_slice()),
1192        );
1193
1194        let expected = Schema::new([
1195            struct_field(
1196                "j",
1197                [
1198                    struct_field(
1199                        "a",
1200                        [
1201                            new_field("x", ArrowDataType::Int64),
1202                            struct_field("z", vec![new_field("q", ArrowDataType::Boolean)]),
1203                        ],
1204                    ),
1205                    struct_field(
1206                        "c",
1207                        [
1208                            new_field("d", ArrowDataType::Int64),
1209                            new_field("e", ArrowDataType::Utf8),
1210                        ],
1211                    ),
1212                ],
1213            ),
1214            new_field("tag", ArrowDataType::Utf8),
1215            struct_field(
1216                "k",
1217                [
1218                    new_field("k_0", ArrowDataType::Int64),
1219                    new_field("k_1", ArrowDataType::Utf8),
1220                ],
1221            ),
1222        ]);
1223
1224        assert_eq!(schema, expected);
1225    }
1226}