Skip to main content

mito2/sst/parquet/
flat_format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! It can store both encoded primary key and raw key columns.
18//!
19//! We store two additional internal columns at last:
20//! - `__primary_key`, the encoded primary key of the row (tags). Type: dictionary(uint32, binary)
21//! - `__sequence`, the sequence number of a row. Type: uint64
22//! - `__op_type`, the op type of the row. Type: uint8
23//!
24//! The format is
25//! ```text
26//! primary key columns, field columns, time index, encoded primary key, __sequence, __op_type.
27//!
28//! It stores field columns in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns())
29//! and stores primary key columns in the same order as [RegionMetadata::primary_key].
30
31use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{
37    Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::compute::kernels::take::take;
40use datatypes::arrow::datatypes::{Schema, SchemaRef};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
44use parquet::file::metadata::RowGroupMetaData;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{RegionMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51    ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
52    NewRecordBatchSnafu, Result,
53};
54use crate::sst::parquet::format::{
55    FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray,
56    PrimaryKeyReadFormat, ReadFormat, StatValues,
57};
58use crate::sst::{
59    FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
60    to_flat_sst_arrow_schema,
61};
62
63/// Helper for writing the SST format.
64pub(crate) struct FlatWriteFormat {
65    /// SST file schema.
66    arrow_schema: SchemaRef,
67    override_sequence: Option<SequenceNumber>,
68}
69
70impl FlatWriteFormat {
71    /// Creates a new helper.
72    pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
73        let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
74        FlatWriteFormat {
75            arrow_schema,
76            override_sequence: None,
77        }
78    }
79
80    /// Set override sequence.
81    pub(crate) fn with_override_sequence(
82        mut self,
83        override_sequence: Option<SequenceNumber>,
84    ) -> Self {
85        self.override_sequence = override_sequence;
86        self
87    }
88
89    /// Gets the arrow schema to store in parquet.
90    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
91        &self.arrow_schema
92    }
93
94    /// Convert `batch` to a arrow record batch to store in parquet.
95    pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
96        debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
97
98        let Some(override_sequence) = self.override_sequence else {
99            return Ok(batch.clone());
100        };
101
102        let mut columns = batch.columns().to_vec();
103        let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
104        columns[sequence_column_index(batch.num_columns())] = sequence_array;
105
106        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
107    }
108}
109
110/// Returns the position of the sequence column.
111pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
112    num_columns - 2
113}
114
115/// Returns the position of the time index column.
116pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
117    num_columns - 4
118}
119
120/// Returns the position of the primary key column.
121pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
122    num_columns - 3
123}
124
125/// Returns the position of the op type key column.
126pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
127    num_columns - 1
128}
129
130/// Returns the start index of field columns in a flat batch.
131///
132/// `num_columns` is the total number of columns in the flat batch schema,
133/// including tag columns (if present), field columns, and fixed position columns
134/// (time index, primary key, sequence, op type).
135///
136/// For Dense encoding (raw PK columns included): field_column_start = primary_key.len()
137/// For Sparse encoding (no raw PK columns): field_column_start = 0
138pub(crate) fn field_column_start(metadata: &RegionMetadata, num_columns: usize) -> usize {
139    // Calculates field column start: total columns - fixed columns - field columns
140    // Field column count = total metadata columns - time index column - primary key columns
141    let field_column_count = metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
142    num_columns - FIXED_POS_COLUMN_NUM - field_column_count
143}
144
145// TODO(yingwen): Add an option to skip reading internal columns if the region is
146// append only and doesn't use sparse encoding (We need to check the table id under
147// sparse encoding).
148/// Helper for reading the flat SST format with projection.
149///
150/// It only supports flat format that stores primary keys additionally.
151pub struct FlatReadFormat {
152    /// Sequence number to override the sequence read from the SST.
153    override_sequence: Option<SequenceNumber>,
154    /// Parquet format adapter.
155    parquet_adapter: ParquetAdapter,
156}
157
158impl FlatReadFormat {
159    /// Creates a helper with existing `metadata` and `column_ids` to read.
160    ///
161    /// If `skip_auto_convert` is true, skips auto conversion of format when the encoding is sparse encoding.
162    pub fn new(
163        metadata: RegionMetadataRef,
164        column_ids: impl Iterator<Item = ColumnId>,
165        num_columns: Option<usize>,
166        file_path: &str,
167        skip_auto_convert: bool,
168    ) -> Result<FlatReadFormat> {
169        let is_legacy = match num_columns {
170            Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
171            None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
172        };
173
174        let parquet_adapter = if is_legacy {
175            // Safety: is_legacy_format() ensures primary_key is not empty.
176            if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse {
177                // Only skip auto convert when the primary key encoding is sparse.
178                ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
179                    metadata,
180                    column_ids,
181                    skip_auto_convert,
182                ))
183            } else {
184                ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
185                    metadata, column_ids, false,
186                ))
187            }
188        } else {
189            ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids))
190        };
191
192        Ok(FlatReadFormat {
193            override_sequence: None,
194            parquet_adapter,
195        })
196    }
197
198    /// Sets the sequence number to override.
199    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
200        self.override_sequence = sequence;
201    }
202
203    /// Index of a column in the projected batch by its column id.
204    pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
205        self.format_projection()
206            .column_id_to_projected_index
207            .get(&column_id)
208            .copied()
209    }
210
211    /// Returns min values of specific column in row groups.
212    pub fn min_values(
213        &self,
214        row_groups: &[impl Borrow<RowGroupMetaData>],
215        column_id: ColumnId,
216    ) -> StatValues {
217        match &self.parquet_adapter {
218            ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
219            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
220        }
221    }
222
223    /// Returns max values of specific column in row groups.
224    pub fn max_values(
225        &self,
226        row_groups: &[impl Borrow<RowGroupMetaData>],
227        column_id: ColumnId,
228    ) -> StatValues {
229        match &self.parquet_adapter {
230            ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
231            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
232        }
233    }
234
235    /// Returns null counts of specific column in row groups.
236    pub fn null_counts(
237        &self,
238        row_groups: &[impl Borrow<RowGroupMetaData>],
239        column_id: ColumnId,
240    ) -> StatValues {
241        match &self.parquet_adapter {
242            ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
243            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
244        }
245    }
246
247    /// Gets the arrow schema of the SST file.
248    ///
249    /// This schema is computed from the region metadata but should be the same
250    /// as the arrow schema decoded from the file metadata.
251    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
252        match &self.parquet_adapter {
253            ParquetAdapter::Flat(p) => &p.arrow_schema,
254            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
255        }
256    }
257
258    /// Gets the metadata of the SST.
259    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
260        match &self.parquet_adapter {
261            ParquetAdapter::Flat(p) => &p.metadata,
262            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
263        }
264    }
265
266    /// Gets sorted projection indices to read from the SST file.
267    pub(crate) fn projection_indices(&self) -> &[usize] {
268        match &self.parquet_adapter {
269            ParquetAdapter::Flat(p) => &p.format_projection.projection_indices,
270            ParquetAdapter::PrimaryKeyToFlat(p) => p.format.projection_indices(),
271        }
272    }
273
274    /// Gets the projection in the flat format.
275    ///
276    /// When `skip_auto_convert` is enabled (primary-key format read), this returns the
277    /// primary-key format projection so filter/prune can resolve projected indices.
278    pub(crate) fn format_projection(&self) -> &FormatProjection {
279        match &self.parquet_adapter {
280            ParquetAdapter::Flat(p) => &p.format_projection,
281            ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
282        }
283    }
284
285    /// Returns `true` if raw batches from parquet use the flat layout and
286    /// stores primary key columns as raw columns.
287    /// Returns `false` for the legacy primary-key-to-flat conversion path.
288    pub(crate) fn batch_has_raw_pk_columns(&self) -> bool {
289        matches!(&self.parquet_adapter, ParquetAdapter::Flat(_))
290    }
291
292    /// Creates a sequence array to override.
293    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
294        self.override_sequence
295            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
296    }
297
298    /// Convert a record batch to apply flat format conversion and override sequence array.
299    ///
300    /// Returns a new RecordBatch with flat format conversion applied first (if enabled),
301    /// then the sequence column replaced by the override sequence array.
302    pub(crate) fn convert_batch(
303        &self,
304        record_batch: RecordBatch,
305        override_sequence_array: Option<&ArrayRef>,
306    ) -> Result<RecordBatch> {
307        // First, apply flat format conversion.
308        let batch = match &self.parquet_adapter {
309            ParquetAdapter::Flat(_) => record_batch,
310            ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
311        };
312
313        // Then apply sequence override if provided
314        let Some(override_array) = override_sequence_array else {
315            return Ok(batch);
316        };
317
318        let mut columns = batch.columns().to_vec();
319        let sequence_column_idx = sequence_column_index(batch.num_columns());
320
321        // Use the provided override sequence array, slicing if necessary to match batch length
322        let sequence_array = if override_array.len() > batch.num_rows() {
323            override_array.slice(0, batch.num_rows())
324        } else {
325            override_array.clone()
326        };
327
328        columns[sequence_column_idx] = sequence_array;
329
330        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
331    }
332
333    /// Checks whether the batch from the parquet file needs to be converted to match the flat format.
334    ///
335    /// * `metadata` is the region metadata (always assumes flat format).
336    /// * `num_columns` is the number of columns in the parquet file.
337    /// * `file_path` is the path to the parquet file, for error message.
338    pub(crate) fn is_legacy_format(
339        metadata: &RegionMetadata,
340        num_columns: usize,
341        file_path: &str,
342    ) -> Result<bool> {
343        if metadata.primary_key.is_empty() {
344            return Ok(false);
345        }
346
347        // For flat format, compute expected column number:
348        // all columns + internal columns (pk, sequence, op_type)
349        let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
350
351        if expected_columns == num_columns {
352            // Same number of columns, no conversion needed
353            Ok(false)
354        } else {
355            ensure!(
356                expected_columns >= num_columns,
357                InvalidParquetSnafu {
358                    file: file_path,
359                    reason: format!(
360                        "Expected columns {} should be >= actual columns {}",
361                        expected_columns, num_columns
362                    )
363                }
364            );
365
366            // Different number of columns, check if the difference matches primary key count
367            let column_diff = expected_columns - num_columns;
368
369            ensure!(
370                column_diff == metadata.primary_key.len(),
371                InvalidParquetSnafu {
372                    file: file_path,
373                    reason: format!(
374                        "Column number difference {} does not match primary key count {}",
375                        column_diff,
376                        metadata.primary_key.len()
377                    )
378                }
379            );
380
381            Ok(true)
382        }
383    }
384}
385
386/// Wraps the parquet helper for different formats.
387enum ParquetAdapter {
388    Flat(ParquetFlat),
389    PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
390}
391
392/// Helper to reads the parquet from primary key format into the flat format.
393struct ParquetPrimaryKeyToFlat {
394    /// The primary key format to read the parquet.
395    format: PrimaryKeyReadFormat,
396    /// Format converter for handling flat format conversion.
397    convert_format: Option<FlatConvertFormat>,
398    /// Projection computed for the flat format.
399    format_projection: FormatProjection,
400}
401
402impl ParquetPrimaryKeyToFlat {
403    /// Creates a helper with existing `metadata` and `column_ids` to read.
404    fn new(
405        metadata: RegionMetadataRef,
406        column_ids: impl Iterator<Item = ColumnId>,
407        skip_auto_convert: bool,
408    ) -> ParquetPrimaryKeyToFlat {
409        assert!(if skip_auto_convert {
410            metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse
411        } else {
412            true
413        });
414
415        let column_ids: Vec<_> = column_ids.collect();
416
417        // Creates a map to lookup index based on the new format.
418        let id_to_index = sst_column_id_indices(&metadata);
419        let sst_column_num =
420            flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
421
422        let codec = build_primary_key_codec(&metadata);
423        let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied());
424        let (convert_format, format_projection) = if skip_auto_convert {
425            (
426                None,
427                FormatProjection {
428                    projection_indices: format.projection_indices().to_vec(),
429                    column_id_to_projected_index: format.field_id_to_projected_index().clone(),
430                },
431            )
432        } else {
433            // Computes the format projection for the new format.
434            let format_projection = FormatProjection::compute_format_projection(
435                &id_to_index,
436                sst_column_num,
437                column_ids.iter().copied(),
438            );
439            (
440                FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec),
441                format_projection,
442            )
443        };
444
445        Self {
446            format,
447            convert_format,
448            format_projection,
449        }
450    }
451
452    fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
453        if let Some(convert_format) = &self.convert_format {
454            convert_format.convert(record_batch)
455        } else {
456            Ok(record_batch)
457        }
458    }
459}
460
461/// Helper to reads the parquet in flat format directly.
462struct ParquetFlat {
463    /// The metadata stored in the SST.
464    metadata: RegionMetadataRef,
465    /// SST file schema.
466    arrow_schema: SchemaRef,
467    /// Projection computed for the flat format.
468    format_projection: FormatProjection,
469    /// Column id to index in SST.
470    column_id_to_sst_index: HashMap<ColumnId, usize>,
471}
472
473impl ParquetFlat {
474    /// Creates a helper with existing `metadata` and `column_ids` to read.
475    fn new(metadata: RegionMetadataRef, column_ids: impl Iterator<Item = ColumnId>) -> ParquetFlat {
476        // Creates a map to lookup index.
477        let id_to_index = sst_column_id_indices(&metadata);
478        let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
479        let sst_column_num =
480            flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
481        let format_projection =
482            FormatProjection::compute_format_projection(&id_to_index, sst_column_num, column_ids);
483
484        Self {
485            metadata,
486            arrow_schema,
487            format_projection,
488            column_id_to_sst_index: id_to_index,
489        }
490    }
491
492    /// Returns min values of specific column in row groups.
493    fn min_values(
494        &self,
495        row_groups: &[impl Borrow<RowGroupMetaData>],
496        column_id: ColumnId,
497    ) -> StatValues {
498        self.get_stat_values(row_groups, column_id, true)
499    }
500
501    /// Returns max values of specific column in row groups.
502    fn max_values(
503        &self,
504        row_groups: &[impl Borrow<RowGroupMetaData>],
505        column_id: ColumnId,
506    ) -> StatValues {
507        self.get_stat_values(row_groups, column_id, false)
508    }
509
510    /// Returns null counts of specific column in row groups.
511    fn null_counts(
512        &self,
513        row_groups: &[impl Borrow<RowGroupMetaData>],
514        column_id: ColumnId,
515    ) -> StatValues {
516        let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
517            // No such column in the SST.
518            return StatValues::NoColumn;
519        };
520
521        let stats = ReadFormat::column_null_counts(row_groups, *index);
522        StatValues::from_stats_opt(stats)
523    }
524
525    fn get_stat_values(
526        &self,
527        row_groups: &[impl Borrow<RowGroupMetaData>],
528        column_id: ColumnId,
529        is_min: bool,
530    ) -> StatValues {
531        let Some(column) = self.metadata.column_by_id(column_id) else {
532            // No such column in the SST.
533            return StatValues::NoColumn;
534        };
535        // Safety: `column_id_to_sst_index` is built from `metadata`.
536        let index = self.column_id_to_sst_index.get(&column_id).unwrap();
537
538        let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
539        StatValues::from_stats_opt(stats)
540    }
541}
542
543/// Returns a map that the key is the column id and the value is the column position
544/// in the SST.
545/// It only supports SSTs with raw primary key columns.
546pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
547    let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
548    let mut column_index = 0;
549    // keys
550    for pk_id in &metadata.primary_key {
551        id_to_index.insert(*pk_id, column_index);
552        column_index += 1;
553    }
554    // fields
555    for column in &metadata.column_metadatas {
556        if column.semantic_type == SemanticType::Field {
557            id_to_index.insert(column.column_id, column_index);
558            column_index += 1;
559        }
560    }
561    // time index
562    id_to_index.insert(metadata.time_index_column().column_id, column_index);
563
564    id_to_index
565}
566
567/// Decodes primary keys from a batch and returns decoded primary key information.
568///
569/// The batch must contain a primary key column at the expected index.
570pub(crate) fn decode_primary_keys(
571    codec: &dyn PrimaryKeyCodec,
572    batch: &RecordBatch,
573) -> Result<DecodedPrimaryKeys> {
574    let primary_key_index = primary_key_column_index(batch.num_columns());
575    let pk_dict_array = batch
576        .column(primary_key_index)
577        .as_any()
578        .downcast_ref::<PrimaryKeyArray>()
579        .with_context(|| InvalidRecordBatchSnafu {
580            reason: "Primary key column is not a dictionary array".to_string(),
581        })?;
582    let pk_values_array = pk_dict_array
583        .values()
584        .as_any()
585        .downcast_ref::<BinaryArray>()
586        .with_context(|| InvalidRecordBatchSnafu {
587            reason: "Primary key values are not binary array".to_string(),
588        })?;
589
590    let keys = pk_dict_array.keys();
591
592    // Decodes primary key values by iterating through keys, reusing decoded values for duplicate keys.
593    // Maps original key index -> new decoded value index
594    let mut key_to_decoded_index = Vec::with_capacity(keys.len());
595    let mut decoded_pk_values = Vec::new();
596    let mut prev_key: Option<u32> = None;
597
598    // The parquet reader may read the whole dictionary page into the dictionary values, so
599    // we may decode many primary keys not in this batch if we decode the values array directly.
600    let pk_indices = keys.values();
601    for &current_key in pk_indices.iter().take(keys.len()) {
602        // Check if current key is the same as previous key
603        if let Some(prev) = prev_key
604            && prev == current_key
605        {
606            // Reuse the last decoded index
607            key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
608            continue;
609        }
610
611        // New key, decodes the value
612        let pk_bytes = pk_values_array.value(current_key as usize);
613        let decoded_value = codec.decode(pk_bytes).context(DecodeSnafu)?;
614
615        decoded_pk_values.push(decoded_value);
616        key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
617        prev_key = Some(current_key);
618    }
619
620    // Create the keys array from key_to_decoded_index
621    let keys_array = UInt32Array::from(key_to_decoded_index);
622
623    Ok(DecodedPrimaryKeys {
624        decoded_pk_values,
625        keys_array,
626    })
627}
628
629/// Holds decoded primary key values and their indices.
630pub(crate) struct DecodedPrimaryKeys {
631    /// Decoded primary key values for unique keys in the dictionary.
632    decoded_pk_values: Vec<CompositeValues>,
633    /// Prebuilt keys array for creating dictionary arrays.
634    keys_array: UInt32Array,
635}
636
637impl DecodedPrimaryKeys {
638    /// Gets a tag column array by column id and data type.
639    ///
640    /// For sparse encoding, uses column_id to lookup values.
641    /// For dense encoding, uses pk_index to get values.
642    pub(crate) fn get_tag_column(
643        &self,
644        column_id: ColumnId,
645        pk_index: Option<usize>,
646        column_type: &ConcreteDataType,
647    ) -> Result<ArrayRef> {
648        // Gets values from the primary key.
649        let mut builder = column_type.create_mutable_vector(self.decoded_pk_values.len());
650        for decoded in &self.decoded_pk_values {
651            match decoded {
652                CompositeValues::Dense(dense) => {
653                    let pk_idx = pk_index.expect("pk_index required for dense encoding");
654                    if pk_idx < dense.len() {
655                        builder.push_value_ref(&dense[pk_idx].1.as_value_ref());
656                    } else {
657                        builder.push_null();
658                    }
659                }
660                CompositeValues::Sparse(sparse) => {
661                    let value = sparse.get_or_null(column_id);
662                    builder.push_value_ref(&value.as_value_ref());
663                }
664            };
665        }
666
667        let values_vector = builder.to_vector();
668        let values_array = values_vector.to_arrow_array();
669
670        // Only creates dictionary array for string types, otherwise take values by keys
671        if column_type.is_string() {
672            // Creates dictionary array using the same keys for string types
673            // Note that the dictionary values may have nulls.
674            let dict_array = DictionaryArray::new(self.keys_array.clone(), values_array);
675            Ok(Arc::new(dict_array))
676        } else {
677            // For non-string types, takes values by keys indices to create a regular array
678            let taken_array =
679                take(&values_array, &self.keys_array, None).context(ComputeArrowSnafu)?;
680            Ok(taken_array)
681        }
682    }
683}
684
685/// Converts a batch that doesn't have decoded primary key columns into a batch that has decoded
686/// primary key columns in flat format.
687pub(crate) struct FlatConvertFormat {
688    /// Metadata of the region.
689    metadata: RegionMetadataRef,
690    /// Primary key codec to decode primary keys.
691    codec: Arc<dyn PrimaryKeyCodec>,
692    /// Projected primary key column information: (column_id, pk_index, column_index in metadata).
693    projected_primary_keys: Vec<(ColumnId, usize, usize)>,
694}
695
696impl FlatConvertFormat {
697    /// Creates a new `FlatConvertFormat`.
698    ///
699    /// The `format_projection` is the projection computed in the [FlatReadFormat] with the `metadata`.
700    /// The `codec` is the primary key codec of the `metadata`.
701    ///
702    /// Returns `None` if there is no primary key.
703    pub(crate) fn new(
704        metadata: RegionMetadataRef,
705        format_projection: &FormatProjection,
706        codec: Arc<dyn PrimaryKeyCodec>,
707    ) -> Option<Self> {
708        if metadata.primary_key.is_empty() {
709            return None;
710        }
711
712        // Builds projected primary keys list maintaining the order of RegionMetadata::primary_key
713        let mut projected_primary_keys = Vec::new();
714        for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
715            if format_projection
716                .column_id_to_projected_index
717                .contains_key(&column_id)
718            {
719                // We expect the format_projection is built from the metadata.
720                let column_index = metadata.column_index_by_id(column_id).unwrap();
721                projected_primary_keys.push((column_id, pk_index, column_index));
722            }
723        }
724
725        Some(Self {
726            metadata,
727            codec,
728            projected_primary_keys,
729        })
730    }
731
732    /// Converts a batch to have decoded primary key columns in flat format.
733    ///
734    /// The primary key array in the batch is a dictionary array.
735    pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
736        if self.projected_primary_keys.is_empty() {
737            return Ok(batch);
738        }
739
740        let decoded_pks = decode_primary_keys(self.codec.as_ref(), &batch)?;
741
742        // Builds decoded tag column arrays.
743        let mut decoded_columns = Vec::new();
744        for (column_id, pk_index, column_index) in &self.projected_primary_keys {
745            let column_metadata = &self.metadata.column_metadatas[*column_index];
746            let tag_column = decoded_pks.get_tag_column(
747                *column_id,
748                Some(*pk_index),
749                &column_metadata.column_schema.data_type,
750            )?;
751            decoded_columns.push(tag_column);
752        }
753
754        // Builds new columns: decoded tag columns first, then original columns
755        let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
756        new_columns.extend(decoded_columns);
757        new_columns.extend_from_slice(batch.columns());
758
759        // Builds new schema
760        let mut new_fields =
761            Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
762        for (_, _, column_index) in &self.projected_primary_keys {
763            let column_metadata = &self.metadata.column_metadatas[*column_index];
764            let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
765            let field =
766                tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
767            new_fields.push(field);
768        }
769        new_fields.extend(batch.schema().fields().iter().cloned());
770
771        let new_schema = Arc::new(Schema::new(new_fields));
772        RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
773    }
774}
775
776#[cfg(test)]
777impl FlatReadFormat {
778    /// Creates a helper with existing `metadata` and all columns.
779    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
780        Self::new(
781            Arc::clone(&metadata),
782            metadata.column_metadatas.iter().map(|c| c.column_id),
783            None,
784            "test",
785            false,
786        )
787        .unwrap()
788    }
789}
790
791#[cfg(test)]
792mod tests {
793    use api::v1::SemanticType;
794    use datatypes::prelude::ConcreteDataType;
795    use datatypes::schema::ColumnSchema;
796    use store_api::codec::PrimaryKeyEncoding;
797    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
798    use store_api::storage::RegionId;
799
800    use super::field_column_start;
801    use crate::sst::{FlatSchemaOptions, flat_sst_arrow_schema_column_num};
802
803    /// Builds a `RegionMetadata` with the given number of tags and fields.
804    fn build_metadata(
805        num_tags: usize,
806        num_fields: usize,
807        encoding: PrimaryKeyEncoding,
808    ) -> RegionMetadata {
809        let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
810        let mut col_id = 0u32;
811
812        for i in 0..num_tags {
813            builder.push_column_metadata(ColumnMetadata {
814                column_schema: ColumnSchema::new(
815                    format!("tag_{i}"),
816                    ConcreteDataType::string_datatype(),
817                    true,
818                ),
819                semantic_type: SemanticType::Tag,
820                column_id: col_id,
821            });
822            col_id += 1;
823        }
824
825        for i in 0..num_fields {
826            builder.push_column_metadata(ColumnMetadata {
827                column_schema: ColumnSchema::new(
828                    format!("field_{i}"),
829                    ConcreteDataType::uint64_datatype(),
830                    true,
831                ),
832                semantic_type: SemanticType::Field,
833                column_id: col_id,
834            });
835            col_id += 1;
836        }
837
838        builder.push_column_metadata(ColumnMetadata {
839            column_schema: ColumnSchema::new(
840                "ts".to_string(),
841                ConcreteDataType::timestamp_millisecond_datatype(),
842                false,
843            ),
844            semantic_type: SemanticType::Timestamp,
845            column_id: col_id,
846        });
847
848        let primary_key: Vec<u32> = (0..num_tags as u32).collect();
849        builder.primary_key(primary_key);
850        builder.primary_key_encoding(encoding);
851        builder.build().unwrap()
852    }
853
854    #[test]
855    fn test_field_column_start() {
856        // (num_tags, num_fields, encoding, expected)
857        let cases = [
858            (1, 1, PrimaryKeyEncoding::Dense, 1),
859            (2, 2, PrimaryKeyEncoding::Dense, 2),
860            (0, 2, PrimaryKeyEncoding::Dense, 0),
861            (2, 2, PrimaryKeyEncoding::Sparse, 0),
862        ];
863
864        for (num_tags, num_fields, encoding, expected) in cases {
865            let metadata = build_metadata(num_tags, num_fields, encoding);
866            let options = FlatSchemaOptions::from_encoding(encoding);
867            let num_columns = flat_sst_arrow_schema_column_num(&metadata, &options);
868            let result = field_column_start(&metadata, num_columns);
869            assert_eq!(
870                result, expected,
871                "num_tags={num_tags}, num_fields={num_fields}, encoding={encoding:?}"
872            );
873        }
874    }
875}