common_meta/ddl/
create_table_template.rs1use std::collections::HashMap;
16
17use api::v1::column_def::try_as_column_def;
18use api::v1::meta::Partition;
19use api::v1::region::{CreateRequest, RegionColumnDef};
20use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
21use common_telemetry::warn;
22use snafu::{OptionExt, ResultExt};
23use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
24use store_api::storage::{RegionId, RegionNumber};
25use table::metadata::{RawTableInfo, TableId};
26
27use crate::error::{self, Result};
28use crate::wal_options_allocator::prepare_wal_options;
29
30pub(crate) fn build_template_from_raw_table_info(
34 raw_table_info: &RawTableInfo,
35) -> Result<CreateRequest> {
36 let primary_key_indices = &raw_table_info.meta.primary_key_indices;
37 let column_defs = raw_table_info
38 .meta
39 .schema
40 .column_schemas
41 .iter()
42 .enumerate()
43 .map(|(i, c)| {
44 let is_primary_key = primary_key_indices.contains(&i);
45 let column_def = try_as_column_def(c, is_primary_key)
46 .context(error::ConvertColumnDefSnafu { column: &c.name })?;
47
48 Ok(RegionColumnDef {
49 column_def: Some(column_def),
50 column_id: i as u32,
53 })
54 })
55 .collect::<Result<Vec<_>>>()?;
56
57 let options = HashMap::from(&raw_table_info.meta.options);
58 let template = CreateRequest {
59 region_id: 0,
60 engine: METRIC_ENGINE_NAME.to_string(),
61 column_defs,
62 primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(),
63 path: String::new(),
64 options,
65 partition: None,
66 };
67
68 Ok(template)
69}
70
71pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {
72 let column_defs = create_table_expr
73 .column_defs
74 .iter()
75 .enumerate()
76 .map(|(i, c)| {
77 let semantic_type = if create_table_expr.time_index == c.name {
78 SemanticType::Timestamp
79 } else if create_table_expr.primary_keys.contains(&c.name) {
80 SemanticType::Tag
81 } else {
82 SemanticType::Field
83 };
84
85 RegionColumnDef {
86 column_def: Some(ColumnDef {
87 name: c.name.clone(),
88 data_type: c.data_type,
89 is_nullable: c.is_nullable,
90 default_constraint: c.default_constraint.clone(),
91 semantic_type: semantic_type as i32,
92 comment: String::new(),
93 datatype_extension: c.datatype_extension,
94 options: c.options.clone(),
95 }),
96 column_id: i as u32,
97 }
98 })
99 .collect::<Vec<_>>();
100
101 let primary_key = create_table_expr
102 .primary_keys
103 .iter()
104 .map(|key| {
105 column_defs
106 .iter()
107 .find_map(|c| {
108 c.column_def.as_ref().and_then(|x| {
109 if &x.name == key {
110 Some(c.column_id)
111 } else {
112 None
113 }
114 })
115 })
116 .context(error::PrimaryKeyNotFoundSnafu { key })
117 })
118 .collect::<Result<_>>()?;
119
120 let template = CreateRequest {
121 region_id: 0,
122 engine: create_table_expr.engine.to_string(),
123 column_defs,
124 primary_key,
125 path: String::new(),
126 options: create_table_expr.table_options.clone(),
127 partition: None,
128 };
129
130 Ok(template)
131}
132
133pub struct CreateRequestBuilder {
135 template: CreateRequest,
136 physical_table_id: Option<TableId>,
138}
139
140impl CreateRequestBuilder {
141 pub(crate) fn new(template: CreateRequest, physical_table_id: Option<TableId>) -> Self {
142 Self {
143 template,
144 physical_table_id,
145 }
146 }
147
148 pub fn template(&self) -> &CreateRequest {
149 &self.template
150 }
151
152 pub fn build_one(
153 &self,
154 region_id: RegionId,
155 storage_path: String,
156 region_wal_options: &HashMap<RegionNumber, String>,
157 partition_exprs: &HashMap<RegionNumber, String>,
158 ) -> CreateRequest {
159 let mut request = self.template.clone();
160
161 request.region_id = region_id.as_u64();
162 request.path = storage_path;
163 prepare_wal_options(&mut request.options, region_id, region_wal_options);
165 request.partition = Some(prepare_partition_expr(region_id, partition_exprs));
166
167 if let Some(physical_table_id) = self.physical_table_id {
168 let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
172
173 request.options.insert(
174 LOGICAL_TABLE_METADATA_KEY.to_string(),
175 physical_region_id.as_u64().to_string(),
176 );
177 }
178
179 request
180 }
181}
182
183fn prepare_partition_expr(
184 region_id: RegionId,
185 partition_exprs: &HashMap<RegionNumber, String>,
186) -> Partition {
187 let expr = partition_exprs.get(®ion_id.region_number()).cloned();
188 if expr.is_none() {
189 warn!("region {} has no partition expr", region_id);
190 }
191
192 Partition {
193 expression: expr.unwrap_or_default(),
194 ..Default::default()
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use std::collections::HashMap;
201
202 use store_api::storage::{RegionId, RegionNumber};
203
204 use super::*;
205
206 #[test]
207 fn test_build_one_sets_partition_expr_per_region() {
208 let template = CreateRequest {
210 region_id: 0,
211 engine: "mito".to_string(),
212 column_defs: vec![],
213 primary_key: vec![],
214 path: String::new(),
215 options: Default::default(),
216 partition: None,
217 };
218 let builder = CreateRequestBuilder::new(template, None);
219
220 let mut partition_exprs: HashMap<RegionNumber, String> = HashMap::new();
221 let expr_a =
222 r#"{"Expr":{"lhs":{"Column":"a"},"op":"Eq","rhs":{"Value":{"UInt32":1}}}}"#.to_string();
223 partition_exprs.insert(0, expr_a.clone());
224
225 let r0 = builder.build_one(
226 RegionId::new(42, 0),
227 "/p".to_string(),
228 &Default::default(),
229 &partition_exprs,
230 );
231 assert_eq!(r0.partition.as_ref().unwrap().expression, expr_a);
232 }
233}