1use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_base::readable_size::ReadableSize;
21use datatypes::arrow::datatypes::{
22 DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
23};
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::metadata::RegionMetadata;
26use store_api::storage::consts::{
27 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
28};
29
30pub mod file;
31pub mod file_purger;
32pub mod index;
33pub mod location;
34pub mod parquet;
35pub(crate) mod version;
36
37pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
39
40pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
42
43pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
45 let fields = Fields::from_iter(
46 metadata
47 .schema
48 .arrow_schema()
49 .fields()
50 .iter()
51 .zip(&metadata.column_metadatas)
52 .filter_map(|(field, column_meta)| {
53 if column_meta.semantic_type == SemanticType::Field {
54 Some(field.clone())
55 } else {
56 None
58 }
59 })
60 .chain([metadata.time_index_field()])
61 .chain(internal_fields()),
62 );
63
64 Arc::new(Schema::new(fields))
65}
66
67pub struct FlatSchemaOptions {
69 pub raw_pk_columns: bool,
71 pub string_pk_use_dict: bool,
75}
76
77impl Default for FlatSchemaOptions {
78 fn default() -> Self {
79 Self {
80 raw_pk_columns: true,
81 string_pk_use_dict: true,
82 }
83 }
84}
85
86impl FlatSchemaOptions {
87 pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
89 if encoding == PrimaryKeyEncoding::Dense {
90 Self::default()
91 } else {
92 Self {
93 raw_pk_columns: false,
94 string_pk_use_dict: false,
95 }
96 }
97 }
98}
99
100pub fn to_flat_sst_arrow_schema(
110 metadata: &RegionMetadata,
111 options: &FlatSchemaOptions,
112) -> SchemaRef {
113 let num_fields = if options.raw_pk_columns {
114 metadata.column_metadatas.len() + 3
115 } else {
116 metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
117 };
118 let mut fields = Vec::with_capacity(num_fields);
119 let schema = metadata.schema.arrow_schema();
120 if options.raw_pk_columns {
121 for pk_id in &metadata.primary_key {
122 let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
123 if options.string_pk_use_dict
124 && metadata.column_metadatas[pk_index]
125 .column_schema
126 .data_type
127 .is_string()
128 {
129 let field = &schema.fields[pk_index];
130 let field = Arc::new(Field::new_dictionary(
131 field.name(),
132 datatypes::arrow::datatypes::DataType::UInt32,
133 field.data_type().clone(),
134 field.is_nullable(),
135 ));
136 fields.push(field);
137 } else {
138 fields.push(schema.fields[pk_index].clone());
139 }
140 }
141 }
142 let remaining_fields = schema
143 .fields()
144 .iter()
145 .zip(&metadata.column_metadatas)
146 .filter_map(|(field, column_meta)| {
147 if column_meta.semantic_type == SemanticType::Field {
148 Some(field.clone())
149 } else {
150 None
151 }
152 })
153 .chain([metadata.time_index_field()])
154 .chain(internal_fields());
155 for field in remaining_fields {
156 fields.push(field);
157 }
158
159 Arc::new(Schema::new(fields))
160}
161
162fn internal_fields() -> [FieldRef; 3] {
164 [
166 Arc::new(Field::new_dictionary(
167 PRIMARY_KEY_COLUMN_NAME,
168 ArrowDataType::UInt32,
169 ArrowDataType::Binary,
170 false,
171 )),
172 Arc::new(Field::new(
173 SEQUENCE_COLUMN_NAME,
174 ArrowDataType::UInt64,
175 false,
176 )),
177 Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
178 ]
179}
180
181pub fn to_plain_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
183 let fields = Fields::from_iter(
184 metadata
185 .schema
186 .arrow_schema()
187 .fields()
188 .iter()
189 .cloned()
190 .chain(plain_internal_fields()),
191 );
192
193 Arc::new(Schema::new(fields))
194}
195
196fn plain_internal_fields() -> [FieldRef; 2] {
198 [
200 Arc::new(Field::new(
201 SEQUENCE_COLUMN_NAME,
202 ArrowDataType::UInt64,
203 false,
204 )),
205 Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
206 ]
207}