mito2/
sst.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sorted strings tables.
16
17use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_base::readable_size::ReadableSize;
21use datatypes::arrow::datatypes::{
22    DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
23};
24use datatypes::prelude::ConcreteDataType;
25use serde::{Deserialize, Serialize};
26use store_api::codec::PrimaryKeyEncoding;
27use store_api::metadata::RegionMetadata;
28use store_api::storage::consts::{
29    OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
30};
31
32pub mod file;
33pub mod file_purger;
34pub mod file_ref;
35pub mod index;
36pub mod location;
37pub mod parquet;
38pub(crate) mod version;
39
40/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb).
41pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
42
43/// Default number of concurrent write, it only works on object store backend(e.g., S3).
44pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
45
46/// Format type of the SST file.
47#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)]
48#[serde(rename_all = "snake_case")]
49#[strum(serialize_all = "snake_case")]
50pub enum FormatType {
51    /// Parquet with primary key encoded.
52    #[default]
53    PrimaryKey,
54    /// Flat Parquet format.
55    Flat,
56}
57
58/// Gets the arrow schema to store in parquet.
59pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
60    let fields = Fields::from_iter(
61        metadata
62            .schema
63            .arrow_schema()
64            .fields()
65            .iter()
66            .zip(&metadata.column_metadatas)
67            .filter_map(|(field, column_meta)| {
68                if column_meta.semantic_type == SemanticType::Field {
69                    Some(field.clone())
70                } else {
71                    // We have fixed positions for tags (primary key) and time index.
72                    None
73                }
74            })
75            .chain([metadata.time_index_field()])
76            .chain(internal_fields()),
77    );
78
79    Arc::new(Schema::new(fields))
80}
81
82/// Options of flat schema.
83pub struct FlatSchemaOptions {
84    /// Whether to store primary key columns additionally instead of an encoded column.
85    pub raw_pk_columns: bool,
86    /// Whether to use dictionary encoding for string primary key columns
87    /// when storing primary key columns.
88    /// Only takes effect when `raw_pk_columns` is true.
89    pub string_pk_use_dict: bool,
90}
91
92impl Default for FlatSchemaOptions {
93    fn default() -> Self {
94        Self {
95            raw_pk_columns: true,
96            string_pk_use_dict: true,
97        }
98    }
99}
100
101impl FlatSchemaOptions {
102    /// Creates a options according to the primary key encoding.
103    pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
104        if encoding == PrimaryKeyEncoding::Dense {
105            Self::default()
106        } else {
107            Self {
108                raw_pk_columns: false,
109                string_pk_use_dict: false,
110            }
111        }
112    }
113}
114
115/// Gets the arrow schema to store in parquet.
116///
117/// The schema is:
118/// ```text
119/// primary key columns, field columns, time index, __prmary_key, __sequence, __op_type
120/// ```
121///
122/// # Panics
123/// Panics if the metadata is invalid.
124pub fn to_flat_sst_arrow_schema(
125    metadata: &RegionMetadata,
126    options: &FlatSchemaOptions,
127) -> SchemaRef {
128    let num_fields = flat_sst_arrow_schema_column_num(metadata, options);
129    let mut fields = Vec::with_capacity(num_fields);
130    let schema = metadata.schema.arrow_schema();
131    if options.raw_pk_columns {
132        for pk_id in &metadata.primary_key {
133            let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
134            if options.string_pk_use_dict {
135                let old_field = &schema.fields[pk_index];
136                let new_field = tag_maybe_to_dictionary_field(
137                    &metadata.column_metadatas[pk_index].column_schema.data_type,
138                    old_field,
139                );
140                fields.push(new_field);
141            }
142        }
143    }
144    let remaining_fields = schema
145        .fields()
146        .iter()
147        .zip(&metadata.column_metadatas)
148        .filter_map(|(field, column_meta)| {
149            if column_meta.semantic_type == SemanticType::Field {
150                Some(field.clone())
151            } else {
152                None
153            }
154        })
155        .chain([metadata.time_index_field()])
156        .chain(internal_fields());
157    for field in remaining_fields {
158        fields.push(field);
159    }
160
161    Arc::new(Schema::new(fields))
162}
163
164/// Returns the number of columns in the flat format.
165pub fn flat_sst_arrow_schema_column_num(
166    metadata: &RegionMetadata,
167    options: &FlatSchemaOptions,
168) -> usize {
169    if options.raw_pk_columns {
170        metadata.column_metadatas.len() + 3
171    } else {
172        metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
173    }
174}
175
176/// Helper function to create a dictionary field from a field.
177fn to_dictionary_field(field: &Field) -> Field {
178    Field::new_dictionary(
179        field.name(),
180        datatypes::arrow::datatypes::DataType::UInt32,
181        field.data_type().clone(),
182        field.is_nullable(),
183    )
184}
185
186/// Helper function to create a dictionary field from a field if it is a string column.
187pub(crate) fn tag_maybe_to_dictionary_field(
188    data_type: &ConcreteDataType,
189    field: &Arc<Field>,
190) -> Arc<Field> {
191    if data_type.is_string() {
192        Arc::new(to_dictionary_field(field))
193    } else {
194        field.clone()
195    }
196}
197
198/// Fields for internal columns.
199pub(crate) fn internal_fields() -> [FieldRef; 3] {
200    // Internal columns are always not null.
201    [
202        Arc::new(Field::new_dictionary(
203            PRIMARY_KEY_COLUMN_NAME,
204            ArrowDataType::UInt32,
205            ArrowDataType::Binary,
206            false,
207        )),
208        Arc::new(Field::new(
209            SEQUENCE_COLUMN_NAME,
210            ArrowDataType::UInt64,
211            false,
212        )),
213        Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
214    ]
215}
216
217/// Gets the arrow schema to store in parquet.
218pub fn to_plain_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
219    let fields = Fields::from_iter(
220        metadata
221            .schema
222            .arrow_schema()
223            .fields()
224            .iter()
225            .cloned()
226            .chain(plain_internal_fields()),
227    );
228
229    Arc::new(Schema::new(fields))
230}
231
232/// Fields for internal columns.
233fn plain_internal_fields() -> [FieldRef; 2] {
234    // Internal columns are always not null.
235    [
236        Arc::new(Field::new(
237            SEQUENCE_COLUMN_NAME,
238            ArrowDataType::UInt64,
239            false,
240        )),
241        Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
242    ]
243}