1use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_base::readable_size::ReadableSize;
21use datatypes::arrow::datatypes::{
22 DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
23};
24use datatypes::prelude::ConcreteDataType;
25use serde::{Deserialize, Serialize};
26use store_api::codec::PrimaryKeyEncoding;
27use store_api::metadata::RegionMetadata;
28use store_api::storage::consts::{
29 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
30};
31
32pub mod file;
33pub mod file_purger;
34pub mod file_ref;
35pub mod index;
36pub mod location;
37pub mod parquet;
38pub(crate) mod version;
39
40pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
42
43pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
45
46#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)]
48#[serde(rename_all = "snake_case")]
49#[strum(serialize_all = "snake_case")]
50pub enum FormatType {
51 #[default]
53 PrimaryKey,
54 Flat,
56}
57
58pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
60 let fields = Fields::from_iter(
61 metadata
62 .schema
63 .arrow_schema()
64 .fields()
65 .iter()
66 .zip(&metadata.column_metadatas)
67 .filter_map(|(field, column_meta)| {
68 if column_meta.semantic_type == SemanticType::Field {
69 Some(field.clone())
70 } else {
71 None
73 }
74 })
75 .chain([metadata.time_index_field()])
76 .chain(internal_fields()),
77 );
78
79 Arc::new(Schema::new(fields))
80}
81
82pub struct FlatSchemaOptions {
84 pub raw_pk_columns: bool,
86 pub string_pk_use_dict: bool,
90}
91
92impl Default for FlatSchemaOptions {
93 fn default() -> Self {
94 Self {
95 raw_pk_columns: true,
96 string_pk_use_dict: true,
97 }
98 }
99}
100
101impl FlatSchemaOptions {
102 pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
104 if encoding == PrimaryKeyEncoding::Dense {
105 Self::default()
106 } else {
107 Self {
108 raw_pk_columns: false,
109 string_pk_use_dict: false,
110 }
111 }
112 }
113}
114
115pub fn to_flat_sst_arrow_schema(
125 metadata: &RegionMetadata,
126 options: &FlatSchemaOptions,
127) -> SchemaRef {
128 let num_fields = flat_sst_arrow_schema_column_num(metadata, options);
129 let mut fields = Vec::with_capacity(num_fields);
130 let schema = metadata.schema.arrow_schema();
131 if options.raw_pk_columns {
132 for pk_id in &metadata.primary_key {
133 let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
134 if options.string_pk_use_dict {
135 let old_field = &schema.fields[pk_index];
136 let new_field = tag_maybe_to_dictionary_field(
137 &metadata.column_metadatas[pk_index].column_schema.data_type,
138 old_field,
139 );
140 fields.push(new_field);
141 }
142 }
143 }
144 let remaining_fields = schema
145 .fields()
146 .iter()
147 .zip(&metadata.column_metadatas)
148 .filter_map(|(field, column_meta)| {
149 if column_meta.semantic_type == SemanticType::Field {
150 Some(field.clone())
151 } else {
152 None
153 }
154 })
155 .chain([metadata.time_index_field()])
156 .chain(internal_fields());
157 for field in remaining_fields {
158 fields.push(field);
159 }
160
161 Arc::new(Schema::new(fields))
162}
163
164pub fn flat_sst_arrow_schema_column_num(
166 metadata: &RegionMetadata,
167 options: &FlatSchemaOptions,
168) -> usize {
169 if options.raw_pk_columns {
170 metadata.column_metadatas.len() + 3
171 } else {
172 metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
173 }
174}
175
176fn to_dictionary_field(field: &Field) -> Field {
178 Field::new_dictionary(
179 field.name(),
180 datatypes::arrow::datatypes::DataType::UInt32,
181 field.data_type().clone(),
182 field.is_nullable(),
183 )
184}
185
186pub(crate) fn tag_maybe_to_dictionary_field(
188 data_type: &ConcreteDataType,
189 field: &Arc<Field>,
190) -> Arc<Field> {
191 if data_type.is_string() {
192 Arc::new(to_dictionary_field(field))
193 } else {
194 field.clone()
195 }
196}
197
198pub(crate) fn internal_fields() -> [FieldRef; 3] {
200 [
202 Arc::new(Field::new_dictionary(
203 PRIMARY_KEY_COLUMN_NAME,
204 ArrowDataType::UInt32,
205 ArrowDataType::Binary,
206 false,
207 )),
208 Arc::new(Field::new(
209 SEQUENCE_COLUMN_NAME,
210 ArrowDataType::UInt64,
211 false,
212 )),
213 Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
214 ]
215}
216
217pub fn to_plain_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
219 let fields = Fields::from_iter(
220 metadata
221 .schema
222 .arrow_schema()
223 .fields()
224 .iter()
225 .cloned()
226 .chain(plain_internal_fields()),
227 );
228
229 Arc::new(Schema::new(fields))
230}
231
232fn plain_internal_fields() -> [FieldRef; 2] {
234 [
236 Arc::new(Field::new(
237 SEQUENCE_COLUMN_NAME,
238 ArrowDataType::UInt64,
239 false,
240 )),
241 Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
242 ]
243}