mito2/
sst.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sorted strings tables.
16
17use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_base::readable_size::ReadableSize;
21use datatypes::arrow::datatypes::{
22    DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
23};
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metadata::RegionMetadata;
26use store_api::storage::consts::{
27    OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
28};
29
30pub mod file;
31pub mod file_purger;
32pub mod index;
33pub mod location;
34pub mod parquet;
35pub(crate) mod version;
36
37/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb).
38pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
39
40/// Default number of concurrent write, it only works on object store backend(e.g., S3).
41pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
42
43/// Gets the arrow schema to store in parquet.
44pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
45    let fields = Fields::from_iter(
46        metadata
47            .schema
48            .arrow_schema()
49            .fields()
50            .iter()
51            .zip(&metadata.column_metadatas)
52            .filter_map(|(field, column_meta)| {
53                if column_meta.semantic_type == SemanticType::Field {
54                    Some(field.clone())
55                } else {
56                    // We have fixed positions for tags (primary key) and time index.
57                    None
58                }
59            })
60            .chain([metadata.time_index_field()])
61            .chain(internal_fields()),
62    );
63
64    Arc::new(Schema::new(fields))
65}
66
67/// Options of flat schema.
68pub struct FlatSchemaOptions {
69    /// Whether to store primary key columns additionally instead of an encoded column.
70    pub raw_pk_columns: bool,
71    /// Whether to use dictionary encoding for string primary key columns
72    /// when storing primary key columns.
73    /// Only takes effect when `raw_pk_columns` is true.
74    pub string_pk_use_dict: bool,
75}
76
77impl Default for FlatSchemaOptions {
78    fn default() -> Self {
79        Self {
80            raw_pk_columns: true,
81            string_pk_use_dict: true,
82        }
83    }
84}
85
86impl FlatSchemaOptions {
87    /// Creates a options according to the primary key encoding.
88    pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
89        if encoding == PrimaryKeyEncoding::Dense {
90            Self::default()
91        } else {
92            Self {
93                raw_pk_columns: false,
94                string_pk_use_dict: false,
95            }
96        }
97    }
98}
99
100/// Gets the arrow schema to store in parquet.
101///
102/// The schema is:
103/// ```text
104/// primary key columns, field columns, time index, __prmary_key, __sequence, __op_type
105/// ```
106///
107/// # Panics
108/// Panics if the metadata is invalid.
109pub fn to_flat_sst_arrow_schema(
110    metadata: &RegionMetadata,
111    options: &FlatSchemaOptions,
112) -> SchemaRef {
113    let num_fields = if options.raw_pk_columns {
114        metadata.column_metadatas.len() + 3
115    } else {
116        metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
117    };
118    let mut fields = Vec::with_capacity(num_fields);
119    let schema = metadata.schema.arrow_schema();
120    if options.raw_pk_columns {
121        for pk_id in &metadata.primary_key {
122            let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
123            if options.string_pk_use_dict
124                && metadata.column_metadatas[pk_index]
125                    .column_schema
126                    .data_type
127                    .is_string()
128            {
129                let field = &schema.fields[pk_index];
130                let field = Arc::new(Field::new_dictionary(
131                    field.name(),
132                    datatypes::arrow::datatypes::DataType::UInt32,
133                    field.data_type().clone(),
134                    field.is_nullable(),
135                ));
136                fields.push(field);
137            } else {
138                fields.push(schema.fields[pk_index].clone());
139            }
140        }
141    }
142    let remaining_fields = schema
143        .fields()
144        .iter()
145        .zip(&metadata.column_metadatas)
146        .filter_map(|(field, column_meta)| {
147            if column_meta.semantic_type == SemanticType::Field {
148                Some(field.clone())
149            } else {
150                None
151            }
152        })
153        .chain([metadata.time_index_field()])
154        .chain(internal_fields());
155    for field in remaining_fields {
156        fields.push(field);
157    }
158
159    Arc::new(Schema::new(fields))
160}
161
162/// Fields for internal columns.
163fn internal_fields() -> [FieldRef; 3] {
164    // Internal columns are always not null.
165    [
166        Arc::new(Field::new_dictionary(
167            PRIMARY_KEY_COLUMN_NAME,
168            ArrowDataType::UInt32,
169            ArrowDataType::Binary,
170            false,
171        )),
172        Arc::new(Field::new(
173            SEQUENCE_COLUMN_NAME,
174            ArrowDataType::UInt64,
175            false,
176        )),
177        Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
178    ]
179}
180
181/// Gets the arrow schema to store in parquet.
182pub fn to_plain_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
183    let fields = Fields::from_iter(
184        metadata
185            .schema
186            .arrow_schema()
187            .fields()
188            .iter()
189            .cloned()
190            .chain(plain_internal_fields()),
191    );
192
193    Arc::new(Schema::new(fields))
194}
195
196/// Fields for internal columns.
197fn plain_internal_fields() -> [FieldRef; 2] {
198    // Internal columns are always not null.
199    [
200        Arc::new(Field::new(
201            SEQUENCE_COLUMN_NAME,
202            ArrowDataType::UInt64,
203            false,
204        )),
205        Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
206    ]
207}