1pub mod executor;
16pub mod template;
17
18use std::collections::HashMap;
19
20use api::v1::CreateTableExpr;
21use async_trait::async_trait;
22use common_error::ext::BoxedError;
23use common_procedure::error::{
24 ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
25};
26use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status};
27use common_telemetry::info;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::storage::RegionNumber;
32use strum::AsRefStr;
33use table::metadata::{TableId, TableInfo};
34use table::table_name::TableName;
35use table::table_reference::TableReference;
36pub(crate) use template::{CreateRequestBuilder, build_template_from_raw_table_info};
37
38use crate::ddl::create_table::executor::CreateTableExecutor;
39use crate::ddl::create_table::template::build_template;
40use crate::ddl::utils::map_to_procedure_error;
41use crate::ddl::{DdlContext, TableMetadata};
42use crate::error::{self, Result};
43use crate::key::table_route::PhysicalTableRouteValue;
44use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
45use crate::metrics;
46use crate::peer::PeerAllocContext;
47use crate::region_keeper::OperatingRegionGuard;
48use crate::rpc::ddl::{CreateTableTask, QueryContext};
49use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
50
51pub struct CreateTableProcedure {
52 pub context: DdlContext,
53 pub data: CreateTableData,
55 pub opening_regions: Vec<OperatingRegionGuard>,
57 pub executor: CreateTableExecutor,
59}
60
61fn build_executor_from_create_table_data(
62 create_table_expr: &CreateTableExpr,
63) -> Result<CreateTableExecutor> {
64 let template = build_template(create_table_expr)?;
65 let builder = CreateRequestBuilder::new(template, None);
66 let table_name = TableName::new(
67 create_table_expr.catalog_name.clone(),
68 create_table_expr.schema_name.clone(),
69 create_table_expr.table_name.clone(),
70 );
71 let executor =
72 CreateTableExecutor::new(table_name, create_table_expr.create_if_not_exists, builder);
73 Ok(executor)
74}
75
76impl CreateTableProcedure {
77 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
78
79 pub fn new(task: CreateTableTask, context: DdlContext) -> Result<Self> {
80 Self::new_with_query_context(task, QueryContext::default(), context)
81 }
82
83 pub fn new_with_query_context(
84 task: CreateTableTask,
85 query_context: QueryContext,
86 context: DdlContext,
87 ) -> Result<Self> {
88 let executor = build_executor_from_create_table_data(&task.create_table)?;
89
90 Ok(Self {
91 context,
92 data: CreateTableData::new(task, query_context),
93 opening_regions: vec![],
94 executor,
95 })
96 }
97
98 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
99 let data: CreateTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
100 let create_table_expr = &data.task.create_table;
101 let executor = build_executor_from_create_table_data(create_table_expr)
102 .map_err(BoxedError::new)
103 .context(ExternalSnafu {
104 clean_poisons: false,
105 })?;
106
107 Ok(CreateTableProcedure {
108 context,
109 data,
110 opening_regions: vec![],
111 executor,
112 })
113 }
114
115 fn table_info(&self) -> &TableInfo {
116 &self.data.task.table_info
117 }
118
119 pub(crate) fn table_id(&self) -> TableId {
120 self.table_info().ident.table_id
121 }
122
123 fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
124 self.data
125 .region_wal_options
126 .as_ref()
127 .context(error::UnexpectedSnafu {
128 err_msg: "region_wal_options is not allocated",
129 })
130 }
131
132 fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
133 self.data
134 .table_route
135 .as_ref()
136 .context(error::UnexpectedSnafu {
137 err_msg: "table_route is not allocated",
138 })
139 }
140
141 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
149 let table_id = self
150 .executor
151 .on_prepare(&self.context.table_metadata_manager)
152 .await?;
153 if let Some(table_id) = table_id {
155 return Ok(Status::done_with_output(table_id));
156 }
157
158 self.data.state = CreateTableState::DatanodeCreateRegions;
159 let TableMetadata {
160 table_id,
161 table_route,
162 region_wal_options,
163 } = self
164 .context
165 .table_metadata_allocator
166 .create_with_context(
167 &self.data.task,
168 &PeerAllocContext {
169 extensions: self.data.query_context.extensions.clone(),
170 },
171 )
172 .await?;
173 self.set_allocated_metadata(table_id, table_route, region_wal_options);
174
175 Ok(Status::executing(true))
176 }
177
178 pub async fn on_datanode_create_regions(&mut self, retrying: bool) -> Result<Status> {
190 let mut table_route = self.table_route()?.clone();
191 if retrying {
192 info!(
193 "Remapping region routes addresses for retrying create regions for table: {}",
194 self.data.table_ref()
195 );
196 let storage = self
197 .context
198 .table_metadata_manager
199 .table_route_manager()
200 .table_route_storage();
201 storage
204 .remap_region_routes(&mut table_route.region_routes)
205 .await?;
206 }
207 let guards = self.register_opening_regions(&self.context, &table_route.region_routes)?;
209 if !guards.is_empty() {
210 self.opening_regions = guards;
211 }
212 self.create_regions(&table_route.region_routes).await
213 }
214
215 async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
216 let table_id = self.table_id();
217 let region_wal_options = self.region_wal_options()?;
218 let column_metadatas = self
219 .executor
220 .on_create_regions(
221 &self.context.node_manager,
222 table_id,
223 region_routes,
224 region_wal_options,
225 )
226 .await?;
227
228 self.data.column_metadatas = column_metadatas;
229 self.data.state = CreateTableState::CreateMetadata;
230 Ok(Status::executing(true))
231 }
232
233 async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result<Status> {
238 let table_id = self.table_id();
239 let table_ref = self.data.table_ref();
240 let manager = &self.context.table_metadata_manager;
241
242 let raw_table_info = self.table_info().clone();
243 let region_wal_options = self.region_wal_options()?.clone();
245 let physical_table_route = self.table_route()?.clone();
247 self.executor
248 .on_create_metadata(
249 manager,
250 &self.context.region_failure_detector_controller,
251 raw_table_info,
252 &self.data.column_metadatas,
253 physical_table_route,
254 region_wal_options,
255 )
256 .await?;
257
258 info!(
259 "Successfully created table: {}, table_id: {}, procedure_id: {}",
260 table_ref, table_id, pid
261 );
262
263 self.opening_regions.clear();
264 Ok(Status::done_with_output(table_id))
265 }
266
267 fn register_opening_regions(
269 &self,
270 context: &DdlContext,
271 region_routes: &[RegionRoute],
272 ) -> Result<Vec<OperatingRegionGuard>> {
273 let opening_regions = operating_leader_region_roles(region_routes);
274 if self.opening_regions.len() == opening_regions.len() {
275 return Ok(vec![]);
276 }
277
278 let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
279
280 for (region_id, datanode_id, role) in opening_regions {
281 let guard = context
282 .memory_region_keeper
283 .register_with_role(datanode_id, region_id, role)
284 .context(error::RegionOperatingRaceSnafu {
285 region_id,
286 peer_id: datanode_id,
287 })?;
288 opening_region_guards.push(guard);
289 }
290 Ok(opening_region_guards)
291 }
292
293 pub fn set_allocated_metadata(
294 &mut self,
295 table_id: TableId,
296 table_route: PhysicalTableRouteValue,
297 region_wal_options: HashMap<RegionNumber, String>,
298 ) {
299 self.data.task.table_info.ident.table_id = table_id;
300 self.data.table_route = Some(table_route);
301 self.data.region_wal_options = Some(region_wal_options);
302 }
303}
304
305#[async_trait]
306impl Procedure for CreateTableProcedure {
307 fn type_name(&self) -> &str {
308 Self::TYPE_NAME
309 }
310
311 fn recover(&mut self) -> ProcedureResult<()> {
312 if let Some(x) = &self.data.table_route {
314 self.opening_regions = self
315 .register_opening_regions(&self.context, &x.region_routes)
316 .map_err(BoxedError::new)
317 .context(ExternalSnafu {
318 clean_poisons: false,
319 })?;
320 }
321
322 Ok(())
323 }
324
325 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
326 let state = &self.data.state;
327
328 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
329 .with_label_values(&[state.as_ref()])
330 .start_timer();
331
332 match state {
333 CreateTableState::Prepare => self.on_prepare().await,
334 CreateTableState::DatanodeCreateRegions => {
335 let retrying = ctx.is_retrying().await.unwrap_or(false);
336 self.on_datanode_create_regions(retrying).await
337 }
338 CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
339 }
340 .map_err(map_to_procedure_error)
341 }
342
343 fn dump(&self) -> ProcedureResult<String> {
344 serde_json::to_string(&self.data).context(ToJsonSnafu)
345 }
346
347 fn lock_key(&self) -> LockKey {
348 let table_ref = &self.data.table_ref();
349
350 LockKey::new(vec![
351 CatalogLock::Read(table_ref.catalog).into(),
352 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
353 TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
354 ])
355 }
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
359pub enum CreateTableState {
360 Prepare,
362 DatanodeCreateRegions,
364 CreateMetadata,
366}
367
368#[derive(Debug, Serialize, Deserialize)]
369pub struct CreateTableData {
370 pub state: CreateTableState,
371 pub task: CreateTableTask,
372 #[serde(default)]
373 pub query_context: QueryContext,
374 #[serde(default)]
375 pub column_metadatas: Vec<ColumnMetadata>,
376 pub(crate) table_route: Option<PhysicalTableRouteValue>,
378 pub region_wal_options: Option<HashMap<RegionNumber, String>>,
380}
381
382impl CreateTableData {
383 pub fn new(task: CreateTableTask, query_context: QueryContext) -> Self {
384 CreateTableData {
385 state: CreateTableState::Prepare,
386 column_metadatas: vec![],
387 task,
388 query_context,
389 table_route: None,
390 region_wal_options: None,
391 }
392 }
393
394 fn table_ref(&self) -> TableReference<'_> {
395 self.task.table_ref()
396 }
397}