1use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_base::readable_size::ReadableSize;
21use datatypes::arrow::datatypes::{
22 DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
23};
24use datatypes::prelude::ConcreteDataType;
25use store_api::codec::PrimaryKeyEncoding;
26use store_api::metadata::RegionMetadata;
27use store_api::storage::consts::{
28 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
29};
30
31pub mod file;
32pub mod file_purger;
33pub mod file_ref;
34pub mod index;
35pub mod location;
36pub mod parquet;
37pub(crate) mod version;
38
39pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
41
42pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
44
45pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
47 let fields = Fields::from_iter(
48 metadata
49 .schema
50 .arrow_schema()
51 .fields()
52 .iter()
53 .zip(&metadata.column_metadatas)
54 .filter_map(|(field, column_meta)| {
55 if column_meta.semantic_type == SemanticType::Field {
56 Some(field.clone())
57 } else {
58 None
60 }
61 })
62 .chain([metadata.time_index_field()])
63 .chain(internal_fields()),
64 );
65
66 Arc::new(Schema::new(fields))
67}
68
69pub struct FlatSchemaOptions {
71 pub raw_pk_columns: bool,
73 pub string_pk_use_dict: bool,
77}
78
79impl Default for FlatSchemaOptions {
80 fn default() -> Self {
81 Self {
82 raw_pk_columns: true,
83 string_pk_use_dict: true,
84 }
85 }
86}
87
88impl FlatSchemaOptions {
89 pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
91 if encoding == PrimaryKeyEncoding::Dense {
92 Self::default()
93 } else {
94 Self {
95 raw_pk_columns: false,
96 string_pk_use_dict: false,
97 }
98 }
99 }
100}
101
102pub fn to_flat_sst_arrow_schema(
112 metadata: &RegionMetadata,
113 options: &FlatSchemaOptions,
114) -> SchemaRef {
115 let num_fields = if options.raw_pk_columns {
116 metadata.column_metadatas.len() + 3
117 } else {
118 metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
119 };
120 let mut fields = Vec::with_capacity(num_fields);
121 let schema = metadata.schema.arrow_schema();
122 if options.raw_pk_columns {
123 for pk_id in &metadata.primary_key {
124 let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
125 if options.string_pk_use_dict {
126 let old_field = &schema.fields[pk_index];
127 let new_field = tag_maybe_to_dictionary_field(
128 &metadata.column_metadatas[pk_index].column_schema.data_type,
129 old_field,
130 );
131 fields.push(new_field);
132 }
133 }
134 }
135 let remaining_fields = schema
136 .fields()
137 .iter()
138 .zip(&metadata.column_metadatas)
139 .filter_map(|(field, column_meta)| {
140 if column_meta.semantic_type == SemanticType::Field {
141 Some(field.clone())
142 } else {
143 None
144 }
145 })
146 .chain([metadata.time_index_field()])
147 .chain(internal_fields());
148 for field in remaining_fields {
149 fields.push(field);
150 }
151
152 Arc::new(Schema::new(fields))
153}
154
155fn to_dictionary_field(field: &Field) -> Field {
157 Field::new_dictionary(
158 field.name(),
159 datatypes::arrow::datatypes::DataType::UInt32,
160 field.data_type().clone(),
161 field.is_nullable(),
162 )
163}
164
165pub(crate) fn tag_maybe_to_dictionary_field(
167 data_type: &ConcreteDataType,
168 field: &Arc<Field>,
169) -> Arc<Field> {
170 if data_type.is_string() {
171 Arc::new(to_dictionary_field(field))
172 } else {
173 field.clone()
174 }
175}
176
177pub(crate) fn internal_fields() -> [FieldRef; 3] {
179 [
181 Arc::new(Field::new_dictionary(
182 PRIMARY_KEY_COLUMN_NAME,
183 ArrowDataType::UInt32,
184 ArrowDataType::Binary,
185 false,
186 )),
187 Arc::new(Field::new(
188 SEQUENCE_COLUMN_NAME,
189 ArrowDataType::UInt64,
190 false,
191 )),
192 Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
193 ]
194}
195
196pub fn to_plain_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
198 let fields = Fields::from_iter(
199 metadata
200 .schema
201 .arrow_schema()
202 .fields()
203 .iter()
204 .cloned()
205 .chain(plain_internal_fields()),
206 );
207
208 Arc::new(Schema::new(fields))
209}
210
211fn plain_internal_fields() -> [FieldRef; 2] {
213 [
215 Arc::new(Field::new(
216 SEQUENCE_COLUMN_NAME,
217 ArrowDataType::UInt64,
218 false,
219 )),
220 Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
221 ]
222}