1pub mod executor;
16pub mod template;
17
18use std::collections::HashMap;
19
20use api::v1::CreateTableExpr;
21use async_trait::async_trait;
22use common_error::ext::BoxedError;
23use common_procedure::error::{
24 ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
25};
26use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status};
27use common_telemetry::info;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::storage::RegionNumber;
32use strum::AsRefStr;
33use table::metadata::{TableId, TableInfo};
34use table::table_name::TableName;
35use table::table_reference::TableReference;
36pub(crate) use template::{CreateRequestBuilder, build_template_from_raw_table_info};
37
38use crate::ddl::create_table::executor::CreateTableExecutor;
39use crate::ddl::create_table::template::build_template;
40use crate::ddl::utils::map_to_procedure_error;
41use crate::ddl::{DdlContext, TableMetadata};
42use crate::error::{self, Result};
43use crate::key::table_route::PhysicalTableRouteValue;
44use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
45use crate::metrics;
46use crate::region_keeper::OperatingRegionGuard;
47use crate::rpc::ddl::CreateTableTask;
48use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
49
50pub struct CreateTableProcedure {
51 pub context: DdlContext,
52 pub data: CreateTableData,
54 pub opening_regions: Vec<OperatingRegionGuard>,
56 pub executor: CreateTableExecutor,
58}
59
60fn build_executor_from_create_table_data(
61 create_table_expr: &CreateTableExpr,
62) -> Result<CreateTableExecutor> {
63 let template = build_template(create_table_expr)?;
64 let builder = CreateRequestBuilder::new(template, None);
65 let table_name = TableName::new(
66 create_table_expr.catalog_name.clone(),
67 create_table_expr.schema_name.clone(),
68 create_table_expr.table_name.clone(),
69 );
70 let executor =
71 CreateTableExecutor::new(table_name, create_table_expr.create_if_not_exists, builder);
72 Ok(executor)
73}
74
75impl CreateTableProcedure {
76 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
77
78 pub fn new(task: CreateTableTask, context: DdlContext) -> Result<Self> {
79 let executor = build_executor_from_create_table_data(&task.create_table)?;
80
81 Ok(Self {
82 context,
83 data: CreateTableData::new(task),
84 opening_regions: vec![],
85 executor,
86 })
87 }
88
89 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
90 let data: CreateTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
91 let create_table_expr = &data.task.create_table;
92 let executor = build_executor_from_create_table_data(create_table_expr)
93 .map_err(BoxedError::new)
94 .context(ExternalSnafu {
95 clean_poisons: false,
96 })?;
97
98 Ok(CreateTableProcedure {
99 context,
100 data,
101 opening_regions: vec![],
102 executor,
103 })
104 }
105
106 fn table_info(&self) -> &TableInfo {
107 &self.data.task.table_info
108 }
109
110 pub(crate) fn table_id(&self) -> TableId {
111 self.table_info().ident.table_id
112 }
113
114 fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
115 self.data
116 .region_wal_options
117 .as_ref()
118 .context(error::UnexpectedSnafu {
119 err_msg: "region_wal_options is not allocated",
120 })
121 }
122
123 fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
124 self.data
125 .table_route
126 .as_ref()
127 .context(error::UnexpectedSnafu {
128 err_msg: "table_route is not allocated",
129 })
130 }
131
132 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
140 let table_id = self
141 .executor
142 .on_prepare(&self.context.table_metadata_manager)
143 .await?;
144 if let Some(table_id) = table_id {
146 return Ok(Status::done_with_output(table_id));
147 }
148
149 self.data.state = CreateTableState::DatanodeCreateRegions;
150 let TableMetadata {
151 table_id,
152 table_route,
153 region_wal_options,
154 } = self
155 .context
156 .table_metadata_allocator
157 .create(&self.data.task)
158 .await?;
159 self.set_allocated_metadata(table_id, table_route, region_wal_options);
160
161 Ok(Status::executing(true))
162 }
163
164 pub async fn on_datanode_create_regions(&mut self, retrying: bool) -> Result<Status> {
176 let mut table_route = self.table_route()?.clone();
177 if retrying {
178 info!(
179 "Remapping region routes addresses for retrying create regions for table: {}",
180 self.data.table_ref()
181 );
182 let storage = self
183 .context
184 .table_metadata_manager
185 .table_route_manager()
186 .table_route_storage();
187 storage
190 .remap_region_routes(&mut table_route.region_routes)
191 .await?;
192 }
193 let guards = self.register_opening_regions(&self.context, &table_route.region_routes)?;
195 if !guards.is_empty() {
196 self.opening_regions = guards;
197 }
198 self.create_regions(&table_route.region_routes).await
199 }
200
201 async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
202 let table_id = self.table_id();
203 let region_wal_options = self.region_wal_options()?;
204 let column_metadatas = self
205 .executor
206 .on_create_regions(
207 &self.context.node_manager,
208 table_id,
209 region_routes,
210 region_wal_options,
211 )
212 .await?;
213
214 self.data.column_metadatas = column_metadatas;
215 self.data.state = CreateTableState::CreateMetadata;
216 Ok(Status::executing(true))
217 }
218
219 async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result<Status> {
224 let table_id = self.table_id();
225 let table_ref = self.data.table_ref();
226 let manager = &self.context.table_metadata_manager;
227
228 let raw_table_info = self.table_info().clone();
229 let region_wal_options = self.region_wal_options()?.clone();
231 let physical_table_route = self.table_route()?.clone();
233 self.executor
234 .on_create_metadata(
235 manager,
236 &self.context.region_failure_detector_controller,
237 raw_table_info,
238 &self.data.column_metadatas,
239 physical_table_route,
240 region_wal_options,
241 )
242 .await?;
243
244 info!(
245 "Successfully created table: {}, table_id: {}, procedure_id: {}",
246 table_ref, table_id, pid
247 );
248
249 self.opening_regions.clear();
250 Ok(Status::done_with_output(table_id))
251 }
252
253 fn register_opening_regions(
255 &self,
256 context: &DdlContext,
257 region_routes: &[RegionRoute],
258 ) -> Result<Vec<OperatingRegionGuard>> {
259 let opening_regions = operating_leader_region_roles(region_routes);
260 if self.opening_regions.len() == opening_regions.len() {
261 return Ok(vec![]);
262 }
263
264 let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
265
266 for (region_id, datanode_id, role) in opening_regions {
267 let guard = context
268 .memory_region_keeper
269 .register_with_role(datanode_id, region_id, role)
270 .context(error::RegionOperatingRaceSnafu {
271 region_id,
272 peer_id: datanode_id,
273 })?;
274 opening_region_guards.push(guard);
275 }
276 Ok(opening_region_guards)
277 }
278
279 pub fn set_allocated_metadata(
280 &mut self,
281 table_id: TableId,
282 table_route: PhysicalTableRouteValue,
283 region_wal_options: HashMap<RegionNumber, String>,
284 ) {
285 self.data.task.table_info.ident.table_id = table_id;
286 self.data.table_route = Some(table_route);
287 self.data.region_wal_options = Some(region_wal_options);
288 }
289}
290
291#[async_trait]
292impl Procedure for CreateTableProcedure {
293 fn type_name(&self) -> &str {
294 Self::TYPE_NAME
295 }
296
297 fn recover(&mut self) -> ProcedureResult<()> {
298 if let Some(x) = &self.data.table_route {
300 self.opening_regions = self
301 .register_opening_regions(&self.context, &x.region_routes)
302 .map_err(BoxedError::new)
303 .context(ExternalSnafu {
304 clean_poisons: false,
305 })?;
306 }
307
308 Ok(())
309 }
310
311 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
312 let state = &self.data.state;
313
314 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
315 .with_label_values(&[state.as_ref()])
316 .start_timer();
317
318 match state {
319 CreateTableState::Prepare => self.on_prepare().await,
320 CreateTableState::DatanodeCreateRegions => {
321 let retrying = ctx.is_retrying().await.unwrap_or(false);
322 self.on_datanode_create_regions(retrying).await
323 }
324 CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
325 }
326 .map_err(map_to_procedure_error)
327 }
328
329 fn dump(&self) -> ProcedureResult<String> {
330 serde_json::to_string(&self.data).context(ToJsonSnafu)
331 }
332
333 fn lock_key(&self) -> LockKey {
334 let table_ref = &self.data.table_ref();
335
336 LockKey::new(vec![
337 CatalogLock::Read(table_ref.catalog).into(),
338 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
339 TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
340 ])
341 }
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
345pub enum CreateTableState {
346 Prepare,
348 DatanodeCreateRegions,
350 CreateMetadata,
352}
353
354#[derive(Debug, Serialize, Deserialize)]
355pub struct CreateTableData {
356 pub state: CreateTableState,
357 pub task: CreateTableTask,
358 #[serde(default)]
359 pub column_metadatas: Vec<ColumnMetadata>,
360 pub(crate) table_route: Option<PhysicalTableRouteValue>,
362 pub region_wal_options: Option<HashMap<RegionNumber, String>>,
364}
365
366impl CreateTableData {
367 pub fn new(task: CreateTableTask) -> Self {
368 CreateTableData {
369 state: CreateTableState::Prepare,
370 column_metadatas: vec![],
371 task,
372 table_route: None,
373 region_wal_options: None,
374 }
375 }
376
377 fn table_ref(&self) -> TableReference<'_> {
378 self.task.table_ref()
379 }
380}