mito2/
sst.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sorted strings tables.
16
17use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_base::readable_size::ReadableSize;
21use datatypes::arrow::datatypes::{
22    DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
23};
24use datatypes::prelude::ConcreteDataType;
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metadata::RegionMetadata;
27use store_api::storage::consts::{
28    OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
29};
30
31pub mod file;
32pub mod file_purger;
33pub mod file_ref;
34pub mod index;
35pub mod location;
36pub mod parquet;
37pub(crate) mod version;
38
39/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb).
40pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
41
42/// Default number of concurrent write, it only works on object store backend(e.g., S3).
43pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
44
45/// Gets the arrow schema to store in parquet.
46pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
47    let fields = Fields::from_iter(
48        metadata
49            .schema
50            .arrow_schema()
51            .fields()
52            .iter()
53            .zip(&metadata.column_metadatas)
54            .filter_map(|(field, column_meta)| {
55                if column_meta.semantic_type == SemanticType::Field {
56                    Some(field.clone())
57                } else {
58                    // We have fixed positions for tags (primary key) and time index.
59                    None
60                }
61            })
62            .chain([metadata.time_index_field()])
63            .chain(internal_fields()),
64    );
65
66    Arc::new(Schema::new(fields))
67}
68
69/// Options of flat schema.
70pub struct FlatSchemaOptions {
71    /// Whether to store primary key columns additionally instead of an encoded column.
72    pub raw_pk_columns: bool,
73    /// Whether to use dictionary encoding for string primary key columns
74    /// when storing primary key columns.
75    /// Only takes effect when `raw_pk_columns` is true.
76    pub string_pk_use_dict: bool,
77}
78
79impl Default for FlatSchemaOptions {
80    fn default() -> Self {
81        Self {
82            raw_pk_columns: true,
83            string_pk_use_dict: true,
84        }
85    }
86}
87
88impl FlatSchemaOptions {
89    /// Creates a options according to the primary key encoding.
90    pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
91        if encoding == PrimaryKeyEncoding::Dense {
92            Self::default()
93        } else {
94            Self {
95                raw_pk_columns: false,
96                string_pk_use_dict: false,
97            }
98        }
99    }
100}
101
102/// Gets the arrow schema to store in parquet.
103///
104/// The schema is:
105/// ```text
106/// primary key columns, field columns, time index, __prmary_key, __sequence, __op_type
107/// ```
108///
109/// # Panics
110/// Panics if the metadata is invalid.
111pub fn to_flat_sst_arrow_schema(
112    metadata: &RegionMetadata,
113    options: &FlatSchemaOptions,
114) -> SchemaRef {
115    let num_fields = if options.raw_pk_columns {
116        metadata.column_metadatas.len() + 3
117    } else {
118        metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
119    };
120    let mut fields = Vec::with_capacity(num_fields);
121    let schema = metadata.schema.arrow_schema();
122    if options.raw_pk_columns {
123        for pk_id in &metadata.primary_key {
124            let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
125            if options.string_pk_use_dict {
126                let old_field = &schema.fields[pk_index];
127                let new_field = tag_maybe_to_dictionary_field(
128                    &metadata.column_metadatas[pk_index].column_schema.data_type,
129                    old_field,
130                );
131                fields.push(new_field);
132            }
133        }
134    }
135    let remaining_fields = schema
136        .fields()
137        .iter()
138        .zip(&metadata.column_metadatas)
139        .filter_map(|(field, column_meta)| {
140            if column_meta.semantic_type == SemanticType::Field {
141                Some(field.clone())
142            } else {
143                None
144            }
145        })
146        .chain([metadata.time_index_field()])
147        .chain(internal_fields());
148    for field in remaining_fields {
149        fields.push(field);
150    }
151
152    Arc::new(Schema::new(fields))
153}
154
155/// Helper function to create a dictionary field from a field.
156fn to_dictionary_field(field: &Field) -> Field {
157    Field::new_dictionary(
158        field.name(),
159        datatypes::arrow::datatypes::DataType::UInt32,
160        field.data_type().clone(),
161        field.is_nullable(),
162    )
163}
164
165/// Helper function to create a dictionary field from a field if it is a string column.
166pub(crate) fn tag_maybe_to_dictionary_field(
167    data_type: &ConcreteDataType,
168    field: &Arc<Field>,
169) -> Arc<Field> {
170    if data_type.is_string() {
171        Arc::new(to_dictionary_field(field))
172    } else {
173        field.clone()
174    }
175}
176
177/// Fields for internal columns.
178pub(crate) fn internal_fields() -> [FieldRef; 3] {
179    // Internal columns are always not null.
180    [
181        Arc::new(Field::new_dictionary(
182            PRIMARY_KEY_COLUMN_NAME,
183            ArrowDataType::UInt32,
184            ArrowDataType::Binary,
185            false,
186        )),
187        Arc::new(Field::new(
188            SEQUENCE_COLUMN_NAME,
189            ArrowDataType::UInt64,
190            false,
191        )),
192        Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
193    ]
194}
195
196/// Gets the arrow schema to store in parquet.
197pub fn to_plain_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
198    let fields = Fields::from_iter(
199        metadata
200            .schema
201            .arrow_schema()
202            .fields()
203            .iter()
204            .cloned()
205            .chain(plain_internal_fields()),
206    );
207
208    Arc::new(Schema::new(fields))
209}
210
211/// Fields for internal columns.
212fn plain_internal_fields() -> [FieldRef; 2] {
213    // Internal columns are always not null.
214    [
215        Arc::new(Field::new(
216            SEQUENCE_COLUMN_NAME,
217            ArrowDataType::UInt64,
218            false,
219        )),
220        Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
221    ]
222}