common_meta/ddl/create_table/
template.rs1use std::collections::HashMap;
16
17use api::v1::column_def::try_as_column_def;
18use api::v1::meta::Partition;
19use api::v1::region::{CreateRequest, RegionColumnDef};
20use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
21use common_telemetry::warn;
22use snafu::{OptionExt, ResultExt};
23use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
24use store_api::region_request::RegionRequirements;
25use store_api::storage::{RegionId, RegionNumber};
26use table::metadata::{TableId, TableInfo};
27
28use crate::error::{self, Result, SerializeWalOptionsSnafu};
29use crate::reconciliation::utils::build_column_metadata_from_table_info;
30use crate::wal_provider::{RegionWalOptions, serialize_wal_options};
31
32pub fn build_template_from_raw_table_info(table_info: &TableInfo) -> Result<CreateRequest> {
40 let primary_key_indices = &table_info.meta.primary_key_indices;
41 let column_defs = table_info
42 .meta
43 .schema
44 .column_schemas()
45 .iter()
46 .enumerate()
47 .map(|(i, c)| {
48 let is_primary_key = primary_key_indices.contains(&i);
49 let column_def = try_as_column_def(c, is_primary_key)
50 .context(error::ConvertColumnDefSnafu { column: &c.name })?;
51 Ok(RegionColumnDef {
52 column_def: Some(column_def),
53 column_id: i as u32,
56 })
57 })
58 .collect::<Result<Vec<_>>>()?;
59
60 let options = HashMap::from(&table_info.meta.options);
61 let template = CreateRequest {
62 region_id: 0,
63 engine: table_info.meta.engine.clone(),
64 column_defs,
65 primary_key: table_info
66 .meta
67 .primary_key_indices
68 .iter()
69 .map(|i| *i as u32)
70 .collect(),
71 path: String::new(),
72 options,
73 partition: None,
74 requirements: None,
75 };
76
77 Ok(template)
78}
79
80pub fn build_template_from_raw_table_info_for_physical_table(
87 table_info: &TableInfo,
88) -> Result<CreateRequest> {
89 let name_to_ids = table_info
90 .name_to_ids()
91 .context(error::MissingColumnIdsSnafu)?;
92 let column_metadatas = build_column_metadata_from_table_info(
93 table_info.meta.schema.column_schemas(),
94 &table_info.meta.primary_key_indices,
95 &name_to_ids,
96 )?;
97 let column_defs = column_metadatas
98 .iter()
99 .map(|c| {
100 let column_def =
101 try_as_column_def(&c.column_schema, c.semantic_type == SemanticType::Tag).context(
102 error::ConvertColumnDefSnafu {
103 column: &c.column_schema.name,
104 },
105 )?;
106 let region_column_def = RegionColumnDef {
107 column_def: Some(column_def),
108 column_id: c.column_id,
109 };
110
111 Ok(region_column_def)
112 })
113 .collect::<Result<Vec<_>>>()?;
114 let primary_key = table_info
116 .meta
117 .primary_key_indices
118 .iter()
119 .map(|idx| column_metadatas[*idx].column_id)
120 .collect();
121
122 let options = HashMap::from(&table_info.meta.options);
123 let template = CreateRequest {
124 region_id: 0,
125 engine: table_info.meta.engine.clone(),
126 column_defs,
127 primary_key,
128 path: String::new(),
129 options,
130 partition: None,
131 requirements: None,
132 };
133
134 Ok(template)
135}
136
137pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {
138 let column_defs = create_table_expr
139 .column_defs
140 .iter()
141 .enumerate()
142 .map(|(i, c)| {
143 let semantic_type = if create_table_expr.time_index == c.name {
144 SemanticType::Timestamp
145 } else if create_table_expr.primary_keys.contains(&c.name) {
146 SemanticType::Tag
147 } else {
148 SemanticType::Field
149 };
150
151 RegionColumnDef {
152 column_def: Some(ColumnDef {
153 name: c.name.clone(),
154 data_type: c.data_type,
155 is_nullable: c.is_nullable,
156 default_constraint: c.default_constraint.clone(),
157 semantic_type: semantic_type as i32,
158 comment: String::new(),
159 datatype_extension: c.datatype_extension.clone(),
160 options: c.options.clone(),
161 }),
162 column_id: i as u32,
163 }
164 })
165 .collect::<Vec<_>>();
166
167 let primary_key = create_table_expr
168 .primary_keys
169 .iter()
170 .map(|key| {
171 column_defs
172 .iter()
173 .find_map(|c| {
174 c.column_def.as_ref().and_then(|x| {
175 if &x.name == key {
176 Some(c.column_id)
177 } else {
178 None
179 }
180 })
181 })
182 .context(error::PrimaryKeyNotFoundSnafu { key })
183 })
184 .collect::<Result<_>>()?;
185
186 let template = CreateRequest {
187 region_id: 0,
188 engine: create_table_expr.engine.clone(),
189 column_defs,
190 primary_key,
191 path: String::new(),
192 options: create_table_expr.table_options.clone(),
193 partition: None,
194 requirements: None,
195 };
196
197 Ok(template)
198}
199
200pub struct CreateRequestBuilder {
202 template: CreateRequest,
203 physical_table_id: Option<TableId>,
205 requirements: RegionRequirements,
206}
207
208impl CreateRequestBuilder {
209 pub fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
210 Self {
211 template,
212 physical_table_id,
213 requirements: RegionRequirements::empty(),
214 }
215 }
216
217 pub fn template(&self) -> &CreateRequest {
218 &self.template
219 }
220
221 pub fn with_requirements(mut self, requirements: RegionRequirements) -> Self {
222 self.requirements = requirements;
223 self
224 }
225
226 pub fn build_one(
227 &self,
228 region_id: RegionId,
229 storage_path: String,
230 region_wal_options: &RegionWalOptions,
231 partition_exprs: &HashMap<RegionNumber, String>,
232 ) -> Result<CreateRequest> {
233 let mut request = self.template.clone();
234
235 request.region_id = region_id.as_u64();
236 request.path = storage_path;
237 request.requirements = Some(self.requirements.into());
238 serialize_wal_options(&mut request.options, region_id, region_wal_options)
240 .context(SerializeWalOptionsSnafu { region_id })?;
241 request.partition = Some(prepare_partition_expr(region_id, partition_exprs));
242
243 if let Some(physical_table_id) = self.physical_table_id {
244 let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
248
249 request.options.insert(
250 LOGICAL_TABLE_METADATA_KEY.to_string(),
251 physical_region_id.as_u64().to_string(),
252 );
253 }
254
255 Ok(request)
256 }
257}
258
259fn prepare_partition_expr(
260 region_id: RegionId,
261 partition_exprs: &HashMap<RegionNumber, String>,
262) -> Partition {
263 let expr = partition_exprs.get(®ion_id.region_number()).cloned();
264 if expr.is_none() {
265 warn!("region {} has no partition expr", region_id);
266 }
267
268 Partition {
269 expression: expr.unwrap_or_default(),
270 ..Default::default()
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use std::collections::HashMap;
277
278 use store_api::storage::{RegionId, RegionNumber};
279
280 use super::*;
281 use crate::key::test_utils;
282
283 #[test]
284 fn test_build_one_sets_partition_expr_per_region() {
285 let template = CreateRequest {
287 region_id: 0,
288 engine: "mito".to_string(),
289 column_defs: vec![],
290 primary_key: vec![],
291 path: String::new(),
292 options: Default::default(),
293 partition: None,
294 requirements: None,
295 };
296 let builder = CreateRequestBuilder::new(template, None);
297
298 let mut partition_exprs: HashMap<RegionNumber, String> = HashMap::new();
299 let expr_a =
300 r#"{"Expr":{"lhs":{"Column":"a"},"op":"Eq","rhs":{"Value":{"UInt32":1}}}}"#.to_string();
301 partition_exprs.insert(0, expr_a.clone());
302
303 let r0 = builder
304 .build_one(
305 RegionId::new(42, 0),
306 "/p".to_string(),
307 &Default::default(),
308 &partition_exprs,
309 )
310 .unwrap();
311 assert_eq!(r0.partition.as_ref().unwrap().expression, expr_a);
312 assert_eq!(
313 r0.requirements.map(RegionRequirements::from),
314 Some(RegionRequirements::empty())
315 );
316 }
317
318 #[test]
319 fn test_build_one_sets_explicit_requirements() {
320 let template = CreateRequest {
321 region_id: 0,
322 engine: "mito".to_string(),
323 column_defs: vec![],
324 primary_key: vec![],
325 path: String::new(),
326 options: Default::default(),
327 partition: None,
328 requirements: None,
329 };
330 let builder = CreateRequestBuilder::new(template, None)
331 .with_requirements(RegionRequirements::object_storage());
332
333 let request = builder
334 .build_one(
335 RegionId::new(42, 0),
336 "/p".to_string(),
337 &Default::default(),
338 &Default::default(),
339 )
340 .unwrap();
341
342 assert_eq!(
343 request.requirements.map(RegionRequirements::from),
344 Some(RegionRequirements::object_storage())
345 );
346 }
347
348 #[test]
349 fn test_build_template_for_physical_table_primary_key_matches_indices() {
350 let mut table_info = test_utils::new_test_table_info(42);
351 table_info.meta.primary_key_indices = vec![0, 2];
352 table_info.meta.column_ids = vec![10, 20, 30];
353
354 let template = build_template_from_raw_table_info_for_physical_table(&table_info).unwrap();
355 assert_eq!(template.primary_key, vec![10, 30]);
356 }
357}