mito2/sst/parquet/
flat_format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! It can store both encoded primary key and raw key columns.
18//!
19//! We store two additional internal columns at last:
20//! - `__primary_key`, the encoded primary key of the row (tags). Type: dictionary(uint32, binary)
21//! - `__sequence`, the sequence number of a row. Type: uint64
22//! - `__op_type`, the op type of the row. Type: uint8
23//!
24//! The format is
25//! ```text
26//! primary key columns, field columns, time index, encoded primary key, __sequence, __op_type.
27//!
28//! It stores field columns in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns())
29//! and stores primary key columns in the same order as [RegionMetadata::primary_key].
30
31use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{ArrayRef, UInt64Array};
37use datatypes::arrow::datatypes::SchemaRef;
38use datatypes::arrow::record_batch::RecordBatch;
39use parquet::file::metadata::RowGroupMetaData;
40use snafu::ResultExt;
41use store_api::metadata::{RegionMetadata, RegionMetadataRef};
42use store_api::storage::{ColumnId, SequenceNumber};
43
44use crate::error::{NewRecordBatchSnafu, Result};
45use crate::sst::parquet::format::{FormatProjection, ReadFormat, StatValues};
46use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
47
48/// Helper for writing the SST format.
49#[allow(dead_code)]
50pub(crate) struct FlatWriteFormat {
51    metadata: RegionMetadataRef,
52    /// SST file schema.
53    arrow_schema: SchemaRef,
54    override_sequence: Option<SequenceNumber>,
55}
56
57impl FlatWriteFormat {
58    /// Creates a new helper.
59    #[allow(dead_code)]
60    pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
61        let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
62        FlatWriteFormat {
63            metadata,
64            arrow_schema,
65            override_sequence: None,
66        }
67    }
68
69    /// Set override sequence.
70    #[allow(dead_code)]
71    pub(crate) fn with_override_sequence(
72        mut self,
73        override_sequence: Option<SequenceNumber>,
74    ) -> Self {
75        self.override_sequence = override_sequence;
76        self
77    }
78
79    /// Gets the arrow schema to store in parquet.
80    #[allow(dead_code)]
81    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
82        &self.arrow_schema
83    }
84
85    /// Convert `batch` to a arrow record batch to store in parquet.
86    #[allow(dead_code)]
87    pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
88        debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
89
90        let Some(override_sequence) = self.override_sequence else {
91            return Ok(batch.clone());
92        };
93
94        let mut columns = batch.columns().to_vec();
95        let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
96        columns[sequence_column_index(batch.num_columns())] = sequence_array;
97
98        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
99    }
100}
101
102/// Returns the position of the sequence column.
103pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
104    num_columns - 2
105}
106
107/// Returns the position of the time index column.
108pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
109    num_columns - 4
110}
111
112/// Returns the position of the primary key column.
113pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
114    num_columns - 3
115}
116
117// TODO(yingwen): Add an option to skip reading internal columns.
118/// Helper for reading the flat SST format with projection.
119///
120/// It only supports flat format that stores primary keys additionally.
121pub struct FlatReadFormat {
122    /// The metadata stored in the SST.
123    metadata: RegionMetadataRef,
124    /// SST file schema.
125    arrow_schema: SchemaRef,
126    /// Indices of columns to read from the SST. It contains all internal columns.
127    projection_indices: Vec<usize>,
128    /// Column id to their index in the projected schema (
129    /// the schema after projection).
130    column_id_to_projected_index: HashMap<ColumnId, usize>,
131    /// Column id to index in SST.
132    column_id_to_sst_index: HashMap<ColumnId, usize>,
133    /// Sequence number to override the sequence read from the SST.
134    override_sequence: Option<SequenceNumber>,
135}
136
137impl FlatReadFormat {
138    /// Creates a helper with existing `metadata` and `column_ids` to read.
139    pub fn new(
140        metadata: RegionMetadataRef,
141        column_ids: impl Iterator<Item = ColumnId>,
142    ) -> FlatReadFormat {
143        let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
144
145        // Creates a map to lookup index.
146        let id_to_index = sst_column_id_indices(&metadata);
147
148        let format_projection = FormatProjection::compute_format_projection(
149            &id_to_index,
150            arrow_schema.fields.len(),
151            column_ids,
152        );
153
154        FlatReadFormat {
155            metadata,
156            arrow_schema,
157            projection_indices: format_projection.projection_indices,
158            column_id_to_projected_index: format_projection.column_id_to_projected_index,
159            column_id_to_sst_index: id_to_index,
160            override_sequence: None,
161        }
162    }
163
164    /// Sets the sequence number to override.
165    #[allow(dead_code)]
166    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
167        self.override_sequence = sequence;
168    }
169
170    /// Index of a column in the projected batch by its column id.
171    pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
172        self.column_id_to_projected_index.get(&column_id).copied()
173    }
174
175    /// Returns min values of specific column in row groups.
176    pub fn min_values(
177        &self,
178        row_groups: &[impl Borrow<RowGroupMetaData>],
179        column_id: ColumnId,
180    ) -> StatValues {
181        self.get_stat_values(row_groups, column_id, true)
182    }
183
184    /// Returns max values of specific column in row groups.
185    pub fn max_values(
186        &self,
187        row_groups: &[impl Borrow<RowGroupMetaData>],
188        column_id: ColumnId,
189    ) -> StatValues {
190        self.get_stat_values(row_groups, column_id, false)
191    }
192
193    /// Returns null counts of specific column in row groups.
194    pub fn null_counts(
195        &self,
196        row_groups: &[impl Borrow<RowGroupMetaData>],
197        column_id: ColumnId,
198    ) -> StatValues {
199        let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
200            // No such column in the SST.
201            return StatValues::NoColumn;
202        };
203
204        let stats = ReadFormat::column_null_counts(row_groups, *index);
205        StatValues::from_stats_opt(stats)
206    }
207
208    /// Gets the arrow schema of the SST file.
209    ///
210    /// This schema is computed from the region metadata but should be the same
211    /// as the arrow schema decoded from the file metadata.
212    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
213        &self.arrow_schema
214    }
215
216    /// Gets the metadata of the SST.
217    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
218        &self.metadata
219    }
220
221    /// Gets sorted projection indices to read.
222    pub(crate) fn projection_indices(&self) -> &[usize] {
223        &self.projection_indices
224    }
225
226    /// Creates a sequence array to override.
227    #[allow(dead_code)]
228    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
229        self.override_sequence
230            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
231    }
232
233    /// Convert a record batch to apply override sequence array.
234    ///
235    /// Returns a new RecordBatch with the sequence column replaced by the override sequence array.
236    #[allow(dead_code)]
237    pub(crate) fn convert_batch(
238        &self,
239        record_batch: &RecordBatch,
240        override_sequence_array: Option<&ArrayRef>,
241    ) -> Result<RecordBatch> {
242        let Some(override_array) = override_sequence_array else {
243            return Ok(record_batch.clone());
244        };
245
246        let mut columns = record_batch.columns().to_vec();
247        let sequence_column_idx = sequence_column_index(record_batch.num_columns());
248
249        // Use the provided override sequence array, slicing if necessary to match batch length
250        let sequence_array = if override_array.len() > record_batch.num_rows() {
251            override_array.slice(0, record_batch.num_rows())
252        } else {
253            override_array.clone()
254        };
255
256        columns[sequence_column_idx] = sequence_array;
257
258        RecordBatch::try_new(record_batch.schema(), columns).context(NewRecordBatchSnafu)
259    }
260
261    fn get_stat_values(
262        &self,
263        row_groups: &[impl Borrow<RowGroupMetaData>],
264        column_id: ColumnId,
265        is_min: bool,
266    ) -> StatValues {
267        let Some(column) = self.metadata.column_by_id(column_id) else {
268            // No such column in the SST.
269            return StatValues::NoColumn;
270        };
271        // Safety: `column_id_to_sst_index` is built from `metadata`.
272        let index = self.column_id_to_sst_index.get(&column_id).unwrap();
273
274        let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
275        StatValues::from_stats_opt(stats)
276    }
277}
278
279/// Returns a map that the key is the column id and the value is the column position
280/// in the SST.
281/// It only supports SSTs with raw primary key columns.
282fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
283    let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
284    let mut column_index = 0;
285    // keys
286    for pk_id in &metadata.primary_key {
287        id_to_index.insert(*pk_id, column_index);
288        column_index += 1;
289    }
290    // fields
291    for column in &metadata.column_metadatas {
292        if column.semantic_type == SemanticType::Field {
293            id_to_index.insert(column.column_id, column_index);
294            column_index += 1;
295        }
296    }
297    // time index
298    id_to_index.insert(metadata.time_index_column().column_id, column_index);
299
300    id_to_index
301}
302
303#[cfg(test)]
304impl FlatReadFormat {
305    /// Creates a helper with existing `metadata` and all columns.
306    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
307        Self::new(
308            Arc::clone(&metadata),
309            metadata.column_metadatas.iter().map(|c| c.column_id),
310        )
311    }
312}