common_meta/ddl/create_table/
template.rs1use std::collections::HashMap;
16
17use api::v1::column_def::try_as_column_def;
18use api::v1::meta::Partition;
19use api::v1::region::{CreateRequest, RegionColumnDef};
20use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
21use common_telemetry::warn;
22use snafu::{OptionExt, ResultExt};
23use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
24use store_api::storage::{RegionId, RegionNumber};
25use table::metadata::{TableId, TableInfo};
26
27use crate::error::{self, Result};
28use crate::reconciliation::utils::build_column_metadata_from_table_info;
29use crate::wal_provider::prepare_wal_options;
30
31pub fn build_template_from_raw_table_info(table_info: &TableInfo) -> Result<CreateRequest> {
39 let primary_key_indices = &table_info.meta.primary_key_indices;
40 let column_defs = table_info
41 .meta
42 .schema
43 .column_schemas()
44 .iter()
45 .enumerate()
46 .map(|(i, c)| {
47 let is_primary_key = primary_key_indices.contains(&i);
48 let column_def = try_as_column_def(c, is_primary_key)
49 .context(error::ConvertColumnDefSnafu { column: &c.name })?;
50 Ok(RegionColumnDef {
51 column_def: Some(column_def),
52 column_id: i as u32,
55 })
56 })
57 .collect::<Result<Vec<_>>>()?;
58
59 let options = HashMap::from(&table_info.meta.options);
60 let template = CreateRequest {
61 region_id: 0,
62 engine: table_info.meta.engine.clone(),
63 column_defs,
64 primary_key: table_info
65 .meta
66 .primary_key_indices
67 .iter()
68 .map(|i| *i as u32)
69 .collect(),
70 path: String::new(),
71 options,
72 partition: None,
73 };
74
75 Ok(template)
76}
77
78pub fn build_template_from_raw_table_info_for_physical_table(
85 table_info: &TableInfo,
86) -> Result<CreateRequest> {
87 let name_to_ids = table_info
88 .name_to_ids()
89 .context(error::MissingColumnIdsSnafu)?;
90 let column_metadatas = build_column_metadata_from_table_info(
91 table_info.meta.schema.column_schemas(),
92 &table_info.meta.primary_key_indices,
93 &name_to_ids,
94 )?;
95 let column_defs = column_metadatas
96 .iter()
97 .map(|c| {
98 let column_def =
99 try_as_column_def(&c.column_schema, c.semantic_type == SemanticType::Tag).context(
100 error::ConvertColumnDefSnafu {
101 column: &c.column_schema.name,
102 },
103 )?;
104 let region_column_def = RegionColumnDef {
105 column_def: Some(column_def),
106 column_id: c.column_id,
107 };
108
109 Ok(region_column_def)
110 })
111 .collect::<Result<Vec<_>>>()?;
112 let primary_key = table_info
114 .meta
115 .primary_key_indices
116 .iter()
117 .map(|idx| column_metadatas[*idx].column_id)
118 .collect();
119
120 let options = HashMap::from(&table_info.meta.options);
121 let template = CreateRequest {
122 region_id: 0,
123 engine: table_info.meta.engine.clone(),
124 column_defs,
125 primary_key,
126 path: String::new(),
127 options,
128 partition: None,
129 };
130
131 Ok(template)
132}
133
134pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {
135 let column_defs = create_table_expr
136 .column_defs
137 .iter()
138 .enumerate()
139 .map(|(i, c)| {
140 let semantic_type = if create_table_expr.time_index == c.name {
141 SemanticType::Timestamp
142 } else if create_table_expr.primary_keys.contains(&c.name) {
143 SemanticType::Tag
144 } else {
145 SemanticType::Field
146 };
147
148 RegionColumnDef {
149 column_def: Some(ColumnDef {
150 name: c.name.clone(),
151 data_type: c.data_type,
152 is_nullable: c.is_nullable,
153 default_constraint: c.default_constraint.clone(),
154 semantic_type: semantic_type as i32,
155 comment: String::new(),
156 datatype_extension: c.datatype_extension.clone(),
157 options: c.options.clone(),
158 }),
159 column_id: i as u32,
160 }
161 })
162 .collect::<Vec<_>>();
163
164 let primary_key = create_table_expr
165 .primary_keys
166 .iter()
167 .map(|key| {
168 column_defs
169 .iter()
170 .find_map(|c| {
171 c.column_def.as_ref().and_then(|x| {
172 if &x.name == key {
173 Some(c.column_id)
174 } else {
175 None
176 }
177 })
178 })
179 .context(error::PrimaryKeyNotFoundSnafu { key })
180 })
181 .collect::<Result<_>>()?;
182
183 let template = CreateRequest {
184 region_id: 0,
185 engine: create_table_expr.engine.clone(),
186 column_defs,
187 primary_key,
188 path: String::new(),
189 options: create_table_expr.table_options.clone(),
190 partition: None,
191 };
192
193 Ok(template)
194}
195
196pub struct CreateRequestBuilder {
198 template: CreateRequest,
199 physical_table_id: Option<TableId>,
201}
202
203impl CreateRequestBuilder {
204 pub fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
205 Self {
206 template,
207 physical_table_id,
208 }
209 }
210
211 pub fn template(&self) -> &CreateRequest {
212 &self.template
213 }
214
215 pub fn build_one(
216 &self,
217 region_id: RegionId,
218 storage_path: String,
219 region_wal_options: &HashMap<RegionNumber, String>,
220 partition_exprs: &HashMap<RegionNumber, String>,
221 ) -> CreateRequest {
222 let mut request = self.template.clone();
223
224 request.region_id = region_id.as_u64();
225 request.path = storage_path;
226 prepare_wal_options(&mut request.options, region_id, region_wal_options);
228 request.partition = Some(prepare_partition_expr(region_id, partition_exprs));
229
230 if let Some(physical_table_id) = self.physical_table_id {
231 let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
235
236 request.options.insert(
237 LOGICAL_TABLE_METADATA_KEY.to_string(),
238 physical_region_id.as_u64().to_string(),
239 );
240 }
241
242 request
243 }
244}
245
246fn prepare_partition_expr(
247 region_id: RegionId,
248 partition_exprs: &HashMap<RegionNumber, String>,
249) -> Partition {
250 let expr = partition_exprs.get(®ion_id.region_number()).cloned();
251 if expr.is_none() {
252 warn!("region {} has no partition expr", region_id);
253 }
254
255 Partition {
256 expression: expr.unwrap_or_default(),
257 ..Default::default()
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use std::collections::HashMap;
264
265 use store_api::storage::{RegionId, RegionNumber};
266
267 use super::*;
268 use crate::key::test_utils;
269
270 #[test]
271 fn test_build_one_sets_partition_expr_per_region() {
272 let template = CreateRequest {
274 region_id: 0,
275 engine: "mito".to_string(),
276 column_defs: vec![],
277 primary_key: vec![],
278 path: String::new(),
279 options: Default::default(),
280 partition: None,
281 };
282 let builder = CreateRequestBuilder::new(template, None);
283
284 let mut partition_exprs: HashMap<RegionNumber, String> = HashMap::new();
285 let expr_a =
286 r#"{"Expr":{"lhs":{"Column":"a"},"op":"Eq","rhs":{"Value":{"UInt32":1}}}}"#.to_string();
287 partition_exprs.insert(0, expr_a.clone());
288
289 let r0 = builder.build_one(
290 RegionId::new(42, 0),
291 "/p".to_string(),
292 &Default::default(),
293 &partition_exprs,
294 );
295 assert_eq!(r0.partition.as_ref().unwrap().expression, expr_a);
296 }
297
298 #[test]
299 fn test_build_template_for_physical_table_primary_key_matches_indices() {
300 let mut table_info = test_utils::new_test_table_info(42);
301 table_info.meta.primary_key_indices = vec![0, 2];
302 table_info.meta.column_ids = vec![10, 20, 30];
303
304 let template = build_template_from_raw_table_info_for_physical_table(&table_info).unwrap();
305 assert_eq!(template.primary_key, vec![10, 30]);
306 }
307}