mito2/sst/parquet/
flat_format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! It can store both encoded primary key and raw key columns.
18//!
19//! We store two additional internal columns at last:
20//! - `__primary_key`, the encoded primary key of the row (tags). Type: dictionary(uint32, binary)
21//! - `__sequence`, the sequence number of a row. Type: uint64
22//! - `__op_type`, the op type of the row. Type: uint8
23//!
24//! The format is
25//! ```text
26//! primary key columns, field columns, time index, encoded primary key, __sequence, __op_type.
27//!
28//! It stores field columns in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns())
29//! and stores primary key columns in the same order as [RegionMetadata::primary_key].
30
31use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{
37    Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::compute::kernels::take::take;
40use datatypes::arrow::datatypes::{Schema, SchemaRef};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use mito_codec::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
44use parquet::file::metadata::RowGroupMetaData;
45use snafu::{ensure, OptionExt, ResultExt};
46use store_api::metadata::{RegionMetadata, RegionMetadataRef};
47use store_api::storage::{ColumnId, SequenceNumber};
48
49use crate::error::{
50    ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
51    NewRecordBatchSnafu, Result,
52};
53use crate::sst::parquet::format::{
54    FormatProjection, PrimaryKeyArray, ReadFormat, StatValues, INTERNAL_COLUMN_NUM,
55};
56use crate::sst::{tag_maybe_to_dictionary_field, to_flat_sst_arrow_schema, FlatSchemaOptions};
57
58/// Helper for writing the SST format.
59#[allow(dead_code)]
60pub(crate) struct FlatWriteFormat {
61    metadata: RegionMetadataRef,
62    /// SST file schema.
63    arrow_schema: SchemaRef,
64    override_sequence: Option<SequenceNumber>,
65}
66
67impl FlatWriteFormat {
68    /// Creates a new helper.
69    #[allow(dead_code)]
70    pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
71        let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
72        FlatWriteFormat {
73            metadata,
74            arrow_schema,
75            override_sequence: None,
76        }
77    }
78
79    /// Set override sequence.
80    #[allow(dead_code)]
81    pub(crate) fn with_override_sequence(
82        mut self,
83        override_sequence: Option<SequenceNumber>,
84    ) -> Self {
85        self.override_sequence = override_sequence;
86        self
87    }
88
89    /// Gets the arrow schema to store in parquet.
90    #[allow(dead_code)]
91    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
92        &self.arrow_schema
93    }
94
95    /// Convert `batch` to a arrow record batch to store in parquet.
96    #[allow(dead_code)]
97    pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
98        debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
99
100        let Some(override_sequence) = self.override_sequence else {
101            return Ok(batch.clone());
102        };
103
104        let mut columns = batch.columns().to_vec();
105        let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
106        columns[sequence_column_index(batch.num_columns())] = sequence_array;
107
108        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
109    }
110}
111
112/// Returns the position of the sequence column.
113pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
114    num_columns - 2
115}
116
117/// Returns the position of the time index column.
118pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
119    num_columns - 4
120}
121
122/// Returns the position of the primary key column.
123pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
124    num_columns - 3
125}
126
127/// Returns the position of the op type key column.
128pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
129    num_columns - 1
130}
131
132// TODO(yingwen): Add an option to skip reading internal columns.
133/// Helper for reading the flat SST format with projection.
134///
135/// It only supports flat format that stores primary keys additionally.
136pub struct FlatReadFormat {
137    /// The metadata stored in the SST.
138    metadata: RegionMetadataRef,
139    /// SST file schema.
140    arrow_schema: SchemaRef,
141    /// Projection computed for the format.
142    format_projection: FormatProjection,
143    /// Column id to index in SST.
144    column_id_to_sst_index: HashMap<ColumnId, usize>,
145    /// Sequence number to override the sequence read from the SST.
146    override_sequence: Option<SequenceNumber>,
147    /// Optional format converter for handling flat format conversion.
148    convert_format: Option<FlatConvertFormat>,
149}
150
151impl FlatReadFormat {
152    /// Creates a helper with existing `metadata` and `column_ids` to read.
153    pub fn new(
154        metadata: RegionMetadataRef,
155        column_ids: impl Iterator<Item = ColumnId>,
156        convert_to_flat: bool,
157    ) -> FlatReadFormat {
158        let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
159
160        // Creates a map to lookup index.
161        let id_to_index = sst_column_id_indices(&metadata);
162
163        let format_projection = FormatProjection::compute_format_projection(
164            &id_to_index,
165            arrow_schema.fields.len(),
166            column_ids,
167        );
168
169        let convert_format = if convert_to_flat {
170            let codec = build_primary_key_codec(&metadata);
171            FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec)
172        } else {
173            None
174        };
175
176        FlatReadFormat {
177            metadata,
178            arrow_schema,
179            format_projection,
180            column_id_to_sst_index: id_to_index,
181            override_sequence: None,
182            convert_format,
183        }
184    }
185
186    /// Sets the sequence number to override.
187    #[allow(dead_code)]
188    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
189        self.override_sequence = sequence;
190    }
191
192    /// Index of a column in the projected batch by its column id.
193    pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
194        self.format_projection
195            .column_id_to_projected_index
196            .get(&column_id)
197            .copied()
198    }
199
200    /// Returns min values of specific column in row groups.
201    pub fn min_values(
202        &self,
203        row_groups: &[impl Borrow<RowGroupMetaData>],
204        column_id: ColumnId,
205    ) -> StatValues {
206        self.get_stat_values(row_groups, column_id, true)
207    }
208
209    /// Returns max values of specific column in row groups.
210    pub fn max_values(
211        &self,
212        row_groups: &[impl Borrow<RowGroupMetaData>],
213        column_id: ColumnId,
214    ) -> StatValues {
215        self.get_stat_values(row_groups, column_id, false)
216    }
217
218    /// Returns null counts of specific column in row groups.
219    pub fn null_counts(
220        &self,
221        row_groups: &[impl Borrow<RowGroupMetaData>],
222        column_id: ColumnId,
223    ) -> StatValues {
224        let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
225            // No such column in the SST.
226            return StatValues::NoColumn;
227        };
228
229        let stats = ReadFormat::column_null_counts(row_groups, *index);
230        StatValues::from_stats_opt(stats)
231    }
232
233    /// Gets the arrow schema of the SST file.
234    ///
235    /// This schema is computed from the region metadata but should be the same
236    /// as the arrow schema decoded from the file metadata.
237    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
238        &self.arrow_schema
239    }
240
241    /// Gets the metadata of the SST.
242    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
243        &self.metadata
244    }
245
246    /// Gets sorted projection indices to read.
247    pub(crate) fn projection_indices(&self) -> &[usize] {
248        &self.format_projection.projection_indices
249    }
250
251    /// Gets the projection.
252    pub(crate) fn format_projection(&self) -> &FormatProjection {
253        &self.format_projection
254    }
255
256    /// Creates a sequence array to override.
257    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
258        self.override_sequence
259            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
260    }
261
262    /// Convert a record batch to apply flat format conversion and override sequence array.
263    ///
264    /// Returns a new RecordBatch with flat format conversion applied first (if enabled),
265    /// then the sequence column replaced by the override sequence array.
266    #[allow(dead_code)]
267    pub(crate) fn convert_batch(
268        &self,
269        record_batch: RecordBatch,
270        override_sequence_array: Option<&ArrayRef>,
271    ) -> Result<RecordBatch> {
272        // First, apply flat format conversion if enabled
273        let batch = if let Some(ref convert_format) = self.convert_format {
274            convert_format.convert(record_batch)?
275        } else {
276            record_batch
277        };
278
279        // Then apply sequence override if provided
280        let Some(override_array) = override_sequence_array else {
281            return Ok(batch);
282        };
283
284        let mut columns = batch.columns().to_vec();
285        let sequence_column_idx = sequence_column_index(batch.num_columns());
286
287        // Use the provided override sequence array, slicing if necessary to match batch length
288        let sequence_array = if override_array.len() > batch.num_rows() {
289            override_array.slice(0, batch.num_rows())
290        } else {
291            override_array.clone()
292        };
293
294        columns[sequence_column_idx] = sequence_array;
295
296        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
297    }
298
299    /// Checks whether the batch from the parquet file needs to be converted to match the flat format.
300    ///
301    /// * `file_path` is the path to the parquet file, for error message.
302    /// * `num_columns` is the number of columns in the parquet file.
303    /// * `metadata` is the region metadata (always assumes flat format).
304    #[allow(dead_code)]
305    pub(crate) fn need_convert_to_flat(
306        file_path: &str,
307        num_columns: usize,
308        metadata: &RegionMetadata,
309    ) -> Result<bool> {
310        // For flat format, compute expected column number:
311        // all columns + internal columns (pk, sequence, op_type)
312        let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
313
314        if expected_columns == num_columns {
315            // Same number of columns, no conversion needed
316            Ok(false)
317        } else {
318            ensure!(
319                expected_columns >= num_columns,
320                InvalidParquetSnafu {
321                    file: file_path,
322                    reason: format!(
323                        "Expected columns {} should be >= actual columns {}",
324                        expected_columns, num_columns
325                    )
326                }
327            );
328
329            // Different number of columns, check if the difference matches primary key count
330            let column_diff = expected_columns - num_columns;
331
332            ensure!(
333                column_diff == metadata.primary_key.len(),
334                InvalidParquetSnafu {
335                    file: file_path,
336                    reason: format!(
337                        "Column number difference {} does not match primary key count {}",
338                        column_diff,
339                        metadata.primary_key.len()
340                    )
341                }
342            );
343
344            Ok(true)
345        }
346    }
347
348    fn get_stat_values(
349        &self,
350        row_groups: &[impl Borrow<RowGroupMetaData>],
351        column_id: ColumnId,
352        is_min: bool,
353    ) -> StatValues {
354        let Some(column) = self.metadata.column_by_id(column_id) else {
355            // No such column in the SST.
356            return StatValues::NoColumn;
357        };
358        // Safety: `column_id_to_sst_index` is built from `metadata`.
359        let index = self.column_id_to_sst_index.get(&column_id).unwrap();
360
361        let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
362        StatValues::from_stats_opt(stats)
363    }
364}
365
366/// Returns a map that the key is the column id and the value is the column position
367/// in the SST.
368/// It only supports SSTs with raw primary key columns.
369pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
370    let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
371    let mut column_index = 0;
372    // keys
373    for pk_id in &metadata.primary_key {
374        id_to_index.insert(*pk_id, column_index);
375        column_index += 1;
376    }
377    // fields
378    for column in &metadata.column_metadatas {
379        if column.semantic_type == SemanticType::Field {
380            id_to_index.insert(column.column_id, column_index);
381            column_index += 1;
382        }
383    }
384    // time index
385    id_to_index.insert(metadata.time_index_column().column_id, column_index);
386
387    id_to_index
388}
389
390/// Converts a batch that doesn't have decoded primary key columns into a batch that has decoded
391/// primary key columns in flat format.
392pub(crate) struct FlatConvertFormat {
393    /// Metadata of the region.
394    metadata: RegionMetadataRef,
395    /// Primary key codec to decode primary keys.
396    codec: Arc<dyn PrimaryKeyCodec>,
397    /// Projected primary key column information: (column_id, pk_index, column_index in metadata).
398    projected_primary_keys: Vec<(ColumnId, usize, usize)>,
399}
400
401impl FlatConvertFormat {
402    /// Creates a new `FlatConvertFormat`.
403    ///
404    /// The `format_projection` is the projection computed in the [FlatReadFormat] with the `metadata`.
405    /// The `codec` is the primary key codec of the `metadata`.
406    ///
407    /// Returns `None` if there is no primary key.
408    pub(crate) fn new(
409        metadata: RegionMetadataRef,
410        format_projection: &FormatProjection,
411        codec: Arc<dyn PrimaryKeyCodec>,
412    ) -> Option<Self> {
413        if metadata.primary_key.is_empty() {
414            return None;
415        }
416
417        // Builds projected primary keys list maintaining the order of RegionMetadata::primary_key
418        let mut projected_primary_keys = Vec::new();
419        for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
420            if format_projection
421                .column_id_to_projected_index
422                .contains_key(&column_id)
423            {
424                // We expect the format_projection is built from the metadata.
425                let column_index = metadata.column_index_by_id(column_id).unwrap();
426                projected_primary_keys.push((column_id, pk_index, column_index));
427            }
428        }
429
430        Some(Self {
431            metadata,
432            codec,
433            projected_primary_keys,
434        })
435    }
436
437    /// Converts a batch to have decoded primary key columns in flat format.
438    ///
439    /// The primary key array in the batch is a dictionary array. We decode each value which is a
440    /// primary key and reuse the keys array to build a dictionary array for each tag column.
441    /// The decoded columns are inserted in front of other columns.
442    pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
443        if self.projected_primary_keys.is_empty() {
444            return Ok(batch);
445        }
446
447        let primary_key_index = primary_key_column_index(batch.num_columns());
448        let pk_dict_array = batch
449            .column(primary_key_index)
450            .as_any()
451            .downcast_ref::<PrimaryKeyArray>()
452            .with_context(|| InvalidRecordBatchSnafu {
453                reason: "Primary key column is not a dictionary array".to_string(),
454            })?;
455
456        let pk_values_array = pk_dict_array
457            .values()
458            .as_any()
459            .downcast_ref::<BinaryArray>()
460            .with_context(|| InvalidRecordBatchSnafu {
461                reason: "Primary key values are not binary array".to_string(),
462            })?;
463
464        // Decodes all primary key values
465        let mut decoded_pk_values = Vec::with_capacity(pk_values_array.len());
466        for i in 0..pk_values_array.len() {
467            if pk_values_array.is_null(i) {
468                decoded_pk_values.push(None);
469            } else {
470                let pk_bytes = pk_values_array.value(i);
471                let decoded = self.codec.decode(pk_bytes).context(DecodeSnafu)?;
472                decoded_pk_values.push(Some(decoded));
473            }
474        }
475
476        // Builds decoded tag column arrays.
477        let mut decoded_columns = Vec::new();
478        for (column_id, pk_index, column_index) in &self.projected_primary_keys {
479            let column_metadata = &self.metadata.column_metadatas[*column_index];
480            let tag_column = self.build_primary_key_column(
481                *column_id,
482                *pk_index,
483                &column_metadata.column_schema.data_type,
484                pk_dict_array.keys(),
485                &decoded_pk_values,
486            )?;
487            decoded_columns.push(tag_column);
488        }
489
490        // Builds new columns: decoded tag columns first, then original columns
491        let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
492        new_columns.extend(decoded_columns);
493        new_columns.extend_from_slice(batch.columns());
494
495        // Builds new schema
496        let mut new_fields =
497            Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
498        for (_, _, column_index) in &self.projected_primary_keys {
499            let column_metadata = &self.metadata.column_metadatas[*column_index];
500            let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
501            let field =
502                tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
503            new_fields.push(field);
504        }
505        new_fields.extend(batch.schema().fields().iter().cloned());
506
507        let new_schema = Arc::new(Schema::new(new_fields));
508        RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
509    }
510
511    /// Builds an array for a specific tag column.
512    ///
513    /// It may build a dictionary array if the type is string. Note that the dictionary
514    /// array may have null values, although keys are not null.
515    fn build_primary_key_column(
516        &self,
517        column_id: ColumnId,
518        pk_index: usize,
519        column_type: &ConcreteDataType,
520        keys: &UInt32Array,
521        decoded_pk_values: &[Option<CompositeValues>],
522    ) -> Result<ArrayRef> {
523        // Gets values from the primary key.
524        let mut builder = column_type.create_mutable_vector(decoded_pk_values.len());
525        for decoded_opt in decoded_pk_values {
526            match decoded_opt {
527                Some(decoded) => {
528                    match decoded {
529                        CompositeValues::Dense(dense) => {
530                            if pk_index < dense.len() {
531                                builder.push_value_ref(dense[pk_index].1.as_value_ref());
532                            } else {
533                                builder.push_null();
534                            }
535                        }
536                        CompositeValues::Sparse(sparse) => {
537                            let value = sparse.get_or_null(column_id);
538                            builder.push_value_ref(value.as_value_ref());
539                        }
540                    };
541                }
542                None => builder.push_null(),
543            }
544        }
545
546        let values_vector = builder.to_vector();
547        let values_array = values_vector.to_arrow_array();
548
549        // Only creates dictionary array for string types, otherwise take values by keys
550        if matches!(column_type, ConcreteDataType::String(_)) {
551            // Creates dictionary array using the same keys for string types
552            // Note that the dictionary values may have nulls.
553            let dict_array = DictionaryArray::new(keys.clone(), values_array);
554            Ok(Arc::new(dict_array))
555        } else {
556            // For non-string types, takes values by keys indices to create a regular array
557            let taken_array = take(&values_array, keys, None).context(ComputeArrowSnafu)?;
558            Ok(taken_array)
559        }
560    }
561}
562
563#[cfg(test)]
564impl FlatReadFormat {
565    /// Creates a helper with existing `metadata` and all columns.
566    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
567        Self::new(
568            Arc::clone(&metadata),
569            metadata.column_metadatas.iter().map(|c| c.column_id),
570            false,
571        )
572    }
573}