common_meta/ddl/create_table/
template.rs1use std::collections::HashMap;
16
17use api::v1::column_def::try_as_column_def;
18use api::v1::meta::Partition;
19use api::v1::region::{CreateRequest, RegionColumnDef};
20use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
21use common_telemetry::warn;
22use snafu::{OptionExt, ResultExt};
23use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
24use store_api::storage::{RegionId, RegionNumber};
25use table::metadata::{RawTableInfo, TableId};
26
27use crate::error::{self, Result};
28use crate::wal_provider::prepare_wal_options;
29
30pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result<CreateRequest> {
34 let primary_key_indices = &raw_table_info.meta.primary_key_indices;
35 let column_defs = raw_table_info
36 .meta
37 .schema
38 .column_schemas
39 .iter()
40 .enumerate()
41 .map(|(i, c)| {
42 let is_primary_key = primary_key_indices.contains(&i);
43 let column_def = try_as_column_def(c, is_primary_key)
44 .context(error::ConvertColumnDefSnafu { column: &c.name })?;
45
46 Ok(RegionColumnDef {
47 column_def: Some(column_def),
48 column_id: i as u32,
51 })
52 })
53 .collect::<Result<Vec<_>>>()?;
54
55 let options = HashMap::from(&raw_table_info.meta.options);
56 let template = CreateRequest {
57 region_id: 0,
58 engine: raw_table_info.meta.engine.clone(),
59 column_defs,
60 primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
61 path: String::new(),
62 options,
63 partition: None,
64 };
65
66 Ok(template)
67}
68
69pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {
70 let column_defs = create_table_expr
71 .column_defs
72 .iter()
73 .enumerate()
74 .map(|(i, c)| {
75 let semantic_type = if create_table_expr.time_index == c.name {
76 SemanticType::Timestamp
77 } else if create_table_expr.primary_keys.contains(&c.name) {
78 SemanticType::Tag
79 } else {
80 SemanticType::Field
81 };
82
83 RegionColumnDef {
84 column_def: Some(ColumnDef {
85 name: c.name.clone(),
86 data_type: c.data_type,
87 is_nullable: c.is_nullable,
88 default_constraint: c.default_constraint.clone(),
89 semantic_type: semantic_type as i32,
90 comment: String::new(),
91 datatype_extension: c.datatype_extension.clone(),
92 options: c.options.clone(),
93 }),
94 column_id: i as u32,
95 }
96 })
97 .collect::<Vec<_>>();
98
99 let primary_key = create_table_expr
100 .primary_keys
101 .iter()
102 .map(|key| {
103 column_defs
104 .iter()
105 .find_map(|c| {
106 c.column_def.as_ref().and_then(|x| {
107 if &x.name == key {
108 Some(c.column_id)
109 } else {
110 None
111 }
112 })
113 })
114 .context(error::PrimaryKeyNotFoundSnafu { key })
115 })
116 .collect::<Result<_>>()?;
117
118 let template = CreateRequest {
119 region_id: 0,
120 engine: create_table_expr.engine.clone(),
121 column_defs,
122 primary_key,
123 path: String::new(),
124 options: create_table_expr.table_options.clone(),
125 partition: None,
126 };
127
128 Ok(template)
129}
130
131pub struct CreateRequestBuilder {
133 template: CreateRequest,
134 physical_table_id: Option<TableId>,
136}
137
138impl CreateRequestBuilder {
139 pub fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
140 Self {
141 template,
142 physical_table_id,
143 }
144 }
145
146 pub fn template(&self) -> &CreateRequest {
147 &self.template
148 }
149
150 pub fn build_one(
151 &self,
152 region_id: RegionId,
153 storage_path: String,
154 region_wal_options: &HashMap<RegionNumber, String>,
155 partition_exprs: &HashMap<RegionNumber, String>,
156 ) -> CreateRequest {
157 let mut request = self.template.clone();
158
159 request.region_id = region_id.as_u64();
160 request.path = storage_path;
161 prepare_wal_options(&mut request.options, region_id, region_wal_options);
163 request.partition = Some(prepare_partition_expr(region_id, partition_exprs));
164
165 if let Some(physical_table_id) = self.physical_table_id {
166 let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
170
171 request.options.insert(
172 LOGICAL_TABLE_METADATA_KEY.to_string(),
173 physical_region_id.as_u64().to_string(),
174 );
175 }
176
177 request
178 }
179}
180
181fn prepare_partition_expr(
182 region_id: RegionId,
183 partition_exprs: &HashMap<RegionNumber, String>,
184) -> Partition {
185 let expr = partition_exprs.get(®ion_id.region_number()).cloned();
186 if expr.is_none() {
187 warn!("region {} has no partition expr", region_id);
188 }
189
190 Partition {
191 expression: expr.unwrap_or_default(),
192 ..Default::default()
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use std::collections::HashMap;
199
200 use store_api::storage::{RegionId, RegionNumber};
201
202 use super::*;
203
204 #[test]
205 fn test_build_one_sets_partition_expr_per_region() {
206 let template = CreateRequest {
208 region_id: 0,
209 engine: "mito".to_string(),
210 column_defs: vec![],
211 primary_key: vec![],
212 path: String::new(),
213 options: Default::default(),
214 partition: None,
215 };
216 let builder = CreateRequestBuilder::new(template, None);
217
218 let mut partition_exprs: HashMap<RegionNumber, String> = HashMap::new();
219 let expr_a =
220 r#"{"Expr":{"lhs":{"Column":"a"},"op":"Eq","rhs":{"Value":{"UInt32":1}}}}"#.to_string();
221 partition_exprs.insert(0, expr_a.clone());
222
223 let r0 = builder.build_one(
224 RegionId::new(42, 0),
225 "/p".to_string(),
226 &Default::default(),
227 &partition_exprs,
228 );
229 assert_eq!(r0.partition.as_ref().unwrap().expression, expr_a);
230 }
231}