1pub mod executor;
16pub mod template;
17
18use api::v1::CreateTableExpr;
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use common_procedure::error::{
22 ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
23};
24use common_procedure::local::DynamicKeyLockGuard;
25use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status};
26use common_telemetry::info;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::ColumnMetadata;
30use strum::AsRefStr;
31use table::metadata::{TableId, TableInfo};
32use table::table_name::TableName;
33use table::table_reference::TableReference;
34pub(crate) use template::{CreateRequestBuilder, build_template_from_raw_table_info};
35
36use crate::ddl::create_table::executor::CreateTableExecutor;
37use crate::ddl::create_table::template::build_template;
38use crate::ddl::utils::map_to_procedure_error;
39use crate::ddl::{DdlContext, TableMetadata};
40use crate::error::{self, Result};
41use crate::key::table_route::PhysicalTableRouteValue;
42use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
43use crate::metrics;
44use crate::peer::PeerAllocContext;
45use crate::region_keeper::OperatingRegionGuard;
46use crate::rpc::ddl::{CreateTableTask, QueryContext};
47use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
48use crate::wal_provider::{
49 RegionWalOptions, acquire_remote_wal_read_locks, optional_region_wal_options_serde,
50 refresh_initial_pruned_entry_ids,
51};
52
53pub struct CreateTableProcedure {
54 pub context: DdlContext,
55 pub data: CreateTableData,
57 pub opening_regions: Vec<OperatingRegionGuard>,
59 pub executor: CreateTableExecutor,
61 remote_wal_lock_guards: Vec<DynamicKeyLockGuard>,
63}
64
65fn build_executor_from_create_table_data(
66 create_table_expr: &CreateTableExpr,
67) -> Result<CreateTableExecutor> {
68 let template = build_template(create_table_expr)?;
69 let builder = CreateRequestBuilder::new(template, None);
70 let table_name = TableName::new(
71 create_table_expr.catalog_name.clone(),
72 create_table_expr.schema_name.clone(),
73 create_table_expr.table_name.clone(),
74 );
75 let executor =
76 CreateTableExecutor::new(table_name, create_table_expr.create_if_not_exists, builder);
77 Ok(executor)
78}
79
80impl CreateTableProcedure {
81 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
82
83 pub fn new(task: CreateTableTask, context: DdlContext) -> Result<Self> {
84 Self::new_with_query_context(task, QueryContext::default(), context)
85 }
86
87 pub fn new_with_query_context(
88 task: CreateTableTask,
89 query_context: QueryContext,
90 context: DdlContext,
91 ) -> Result<Self> {
92 let executor = build_executor_from_create_table_data(&task.create_table)?;
93
94 Ok(Self {
95 context,
96 data: CreateTableData::new(task, query_context),
97 opening_regions: vec![],
98 executor,
99 remote_wal_lock_guards: vec![],
100 })
101 }
102
103 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
104 let data: CreateTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
105 let create_table_expr = &data.task.create_table;
106 let executor = build_executor_from_create_table_data(create_table_expr)
107 .map_err(BoxedError::new)
108 .context(ExternalSnafu {
109 clean_poisons: false,
110 })?;
111
112 Ok(CreateTableProcedure {
113 context,
114 data,
115 opening_regions: vec![],
116 executor,
117 remote_wal_lock_guards: vec![],
118 })
119 }
120
121 fn table_info(&self) -> &TableInfo {
122 &self.data.task.table_info
123 }
124
125 pub(crate) fn table_id(&self) -> TableId {
126 self.table_info().ident.table_id
127 }
128
129 fn region_wal_options(&self) -> Result<&RegionWalOptions> {
130 self.data
131 .region_wal_options
132 .as_ref()
133 .context(error::UnexpectedSnafu {
134 err_msg: "region_wal_options is not allocated",
135 })
136 }
137
138 fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
139 self.data
140 .table_route
141 .as_ref()
142 .context(error::UnexpectedSnafu {
143 err_msg: "table_route is not allocated",
144 })
145 }
146
147 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
155 let table_id = self
156 .executor
157 .on_prepare(&self.context.table_metadata_manager)
158 .await?;
159 if let Some(table_id) = table_id {
161 return Ok(Status::done_with_output(table_id));
162 }
163
164 self.data.state = CreateTableState::DatanodeCreateRegions;
165 let TableMetadata {
166 table_id,
167 table_route,
168 region_wal_options,
169 } = self
170 .context
171 .table_metadata_allocator
172 .create_with_context(
173 &self.data.task,
174 &PeerAllocContext {
175 extensions: self.data.query_context.extensions.clone(),
176 },
177 )
178 .await?;
179 self.set_allocated_metadata(table_id, table_route, region_wal_options);
180
181 Ok(Status::executing(true))
182 }
183
184 async fn ensure_remote_wal_read_locks(&mut self, ctx: &ProcedureContext) -> Result<()> {
185 if !self.remote_wal_lock_guards.is_empty() {
186 return Ok(());
187 }
188
189 self.remote_wal_lock_guards =
190 acquire_remote_wal_read_locks(ctx, self.region_wal_options()?).await;
191
192 Ok(())
193 }
194
195 async fn refresh_initial_pruned_entry_ids(&mut self) -> Result<()> {
196 let region_wal_options =
197 self.data
198 .region_wal_options
199 .as_mut()
200 .context(error::UnexpectedSnafu {
201 err_msg: "region_wal_options is not allocated",
202 })?;
203 refresh_initial_pruned_entry_ids(&self.context.table_metadata_manager, region_wal_options)
204 .await
205 }
206
207 pub async fn on_datanode_create_regions(&mut self, retrying: bool) -> Result<Status> {
219 let mut table_route = self.table_route()?.clone();
220 if retrying {
221 info!(
222 "Remapping region routes addresses for retrying create regions for table: {}",
223 self.data.table_ref()
224 );
225 let storage = self
226 .context
227 .table_metadata_manager
228 .table_route_manager()
229 .table_route_storage();
230 storage
233 .remap_region_routes(&mut table_route.region_routes)
234 .await?;
235 }
236 let guards = self.register_opening_regions(&self.context, &table_route.region_routes)?;
238 if !guards.is_empty() {
239 self.opening_regions = guards;
240 }
241 self.create_regions(&table_route.region_routes).await
242 }
243
244 async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
245 let table_id = self.table_id();
246 let region_wal_options = self.region_wal_options()?;
247 let column_metadatas = self
248 .executor
249 .on_create_regions(
250 &self.context.node_manager,
251 table_id,
252 region_routes,
253 region_wal_options,
254 )
255 .await?;
256
257 self.data.column_metadatas = column_metadatas;
258 self.data.state = CreateTableState::CreateMetadata;
259 Ok(Status::executing(true))
260 }
261
262 async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result<Status> {
267 let table_id = self.table_id();
268 let table_ref = self.data.table_ref();
269 let manager = &self.context.table_metadata_manager;
270
271 let raw_table_info = self.table_info().clone();
272 let region_wal_options = self.region_wal_options()?.clone();
274 let physical_table_route = self.table_route()?.clone();
276 self.executor
277 .on_create_metadata(
278 manager,
279 &self.context.region_failure_detector_controller,
280 raw_table_info,
281 &self.data.column_metadatas,
282 physical_table_route,
283 region_wal_options,
284 )
285 .await?;
286
287 info!(
288 "Successfully created table: {}, table_id: {}, procedure_id: {}",
289 table_ref, table_id, pid
290 );
291
292 self.opening_regions.clear();
293 self.remote_wal_lock_guards.clear();
294 Ok(Status::done_with_output(table_id))
295 }
296
297 fn register_opening_regions(
299 &self,
300 context: &DdlContext,
301 region_routes: &[RegionRoute],
302 ) -> Result<Vec<OperatingRegionGuard>> {
303 let opening_regions = operating_leader_region_roles(region_routes);
304 if self.opening_regions.len() == opening_regions.len() {
305 return Ok(vec![]);
306 }
307
308 let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
309
310 for (region_id, datanode_id, role) in opening_regions {
311 let guard = context
312 .memory_region_keeper
313 .register_with_role(datanode_id, region_id, role)
314 .context(error::RegionOperatingRaceSnafu {
315 region_id,
316 peer_id: datanode_id,
317 })?;
318 opening_region_guards.push(guard);
319 }
320 Ok(opening_region_guards)
321 }
322
323 pub fn set_allocated_metadata(
324 &mut self,
325 table_id: TableId,
326 table_route: PhysicalTableRouteValue,
327 region_wal_options: RegionWalOptions,
328 ) {
329 self.data.task.table_info.ident.table_id = table_id;
330 self.data.table_route = Some(table_route);
331 self.data.region_wal_options = Some(region_wal_options);
332 }
333}
334
335#[async_trait]
336impl Procedure for CreateTableProcedure {
337 fn type_name(&self) -> &str {
338 Self::TYPE_NAME
339 }
340
341 fn recover(&mut self) -> ProcedureResult<()> {
342 if let Some(x) = &self.data.table_route {
344 self.opening_regions = self
345 .register_opening_regions(&self.context, &x.region_routes)
346 .map_err(BoxedError::new)
347 .context(ExternalSnafu {
348 clean_poisons: false,
349 })?;
350 }
351
352 Ok(())
353 }
354
355 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
356 let state = &self.data.state;
357
358 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
359 .with_label_values(&[state.as_ref()])
360 .start_timer();
361
362 match state {
363 CreateTableState::Prepare => self.on_prepare().await,
364 CreateTableState::DatanodeCreateRegions => {
365 async {
366 self.ensure_remote_wal_read_locks(ctx).await?;
367 self.refresh_initial_pruned_entry_ids().await?;
368 let retrying = ctx.is_retrying().await.unwrap_or(false);
369 self.on_datanode_create_regions(retrying).await
370 }
371 .await
372 }
373 CreateTableState::CreateMetadata => {
374 async {
375 self.ensure_remote_wal_read_locks(ctx).await?;
376 self.on_create_metadata(ctx.procedure_id).await
377 }
378 .await
379 }
380 }
381 .map_err(map_to_procedure_error)
382 }
383
384 fn dump(&self) -> ProcedureResult<String> {
385 serde_json::to_string(&self.data).context(ToJsonSnafu)
386 }
387
388 fn lock_key(&self) -> LockKey {
389 let table_ref = &self.data.table_ref();
390
391 LockKey::new(vec![
392 CatalogLock::Read(table_ref.catalog).into(),
393 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
394 TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
395 ])
396 }
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
400pub enum CreateTableState {
401 Prepare,
403 DatanodeCreateRegions,
405 CreateMetadata,
407}
408
409#[derive(Debug, Serialize, Deserialize)]
410pub struct CreateTableData {
411 pub state: CreateTableState,
412 pub task: CreateTableTask,
413 #[serde(default)]
414 pub query_context: QueryContext,
415 #[serde(default)]
416 pub column_metadatas: Vec<ColumnMetadata>,
417 pub(crate) table_route: Option<PhysicalTableRouteValue>,
419 #[serde(default)]
421 #[serde(with = "optional_region_wal_options_serde")]
422 pub region_wal_options: Option<RegionWalOptions>,
423}
424
425impl CreateTableData {
426 pub fn new(task: CreateTableTask, query_context: QueryContext) -> Self {
427 CreateTableData {
428 state: CreateTableState::Prepare,
429 column_metadatas: vec![],
430 task,
431 query_context,
432 table_route: None,
433 region_wal_options: None,
434 }
435 }
436
437 fn table_ref(&self) -> TableReference<'_> {
438 self.task.table_ref()
439 }
440}