1use std::collections::HashMap;
16
17use api::v1::region::region_request::Body as PbRegionRequest;
18use api::v1::region::{RegionRequest, RegionRequestHeader};
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use common_procedure::error::{
22 ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
23};
24use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
25use common_telemetry::info;
26use common_telemetry::tracing_context::TracingContext;
27use futures::future::join_all;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, OptionExt, ResultExt};
30use store_api::storage::{RegionId, RegionNumber};
31use strum::AsRefStr;
32use table::metadata::{RawTableInfo, TableId};
33use table::table_reference::TableReference;
34
35use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
36use crate::ddl::utils::{
37 add_peer_context_if_needed, convert_region_routes_to_detecting_regions, handle_retry_error,
38 region_storage_path,
39};
40use crate::ddl::{DdlContext, TableMetadata};
41use crate::error::{self, Result};
42use crate::key::table_name::TableNameKey;
43use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
44use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
45use crate::metrics;
46use crate::region_keeper::OperatingRegionGuard;
47use crate::rpc::ddl::CreateTableTask;
48use crate::rpc::router::{
49 find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
50};
51pub struct CreateTableProcedure {
52 pub context: DdlContext,
53 pub creator: TableCreator,
54}
55
56impl CreateTableProcedure {
57 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
58
59 pub fn new(task: CreateTableTask, context: DdlContext) -> Self {
60 Self {
61 context,
62 creator: TableCreator::new(task),
63 }
64 }
65
66 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
67 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
68
69 Ok(CreateTableProcedure {
70 context,
71 creator: TableCreator {
72 data,
73 opening_regions: vec![],
74 },
75 })
76 }
77
78 fn table_info(&self) -> &RawTableInfo {
79 &self.creator.data.task.table_info
80 }
81
82 pub(crate) fn table_id(&self) -> TableId {
83 self.table_info().ident.table_id
84 }
85
86 fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
87 self.creator
88 .data
89 .region_wal_options
90 .as_ref()
91 .context(error::UnexpectedSnafu {
92 err_msg: "region_wal_options is not allocated",
93 })
94 }
95
96 fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
97 self.creator
98 .data
99 .table_route
100 .as_ref()
101 .context(error::UnexpectedSnafu {
102 err_msg: "table_route is not allocated",
103 })
104 }
105
106 #[cfg(any(test, feature = "testing"))]
107 pub fn set_allocated_metadata(
108 &mut self,
109 table_id: TableId,
110 table_route: PhysicalTableRouteValue,
111 region_wal_options: HashMap<RegionNumber, String>,
112 ) {
113 self.creator
114 .set_allocated_metadata(table_id, table_route, region_wal_options)
115 }
116
117 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
125 let expr = &self.creator.data.task.create_table;
126 let table_name_value = self
127 .context
128 .table_metadata_manager
129 .table_name_manager()
130 .get(TableNameKey::new(
131 &expr.catalog_name,
132 &expr.schema_name,
133 &expr.table_name,
134 ))
135 .await?;
136
137 if let Some(value) = table_name_value {
138 ensure!(
139 expr.create_if_not_exists,
140 error::TableAlreadyExistsSnafu {
141 table_name: self.creator.data.table_ref().to_string(),
142 }
143 );
144
145 let table_id = value.table_id();
146 return Ok(Status::done_with_output(table_id));
147 }
148
149 self.creator.data.state = CreateTableState::DatanodeCreateRegions;
150 let TableMetadata {
151 table_id,
152 table_route,
153 region_wal_options,
154 } = self
155 .context
156 .table_metadata_allocator
157 .create(&self.creator.data.task)
158 .await?;
159 self.creator
160 .set_allocated_metadata(table_id, table_route, region_wal_options);
161
162 Ok(Status::executing(true))
163 }
164
165 pub fn new_region_request_builder(
166 &self,
167 physical_table_id: Option<TableId>,
168 ) -> Result<CreateRequestBuilder> {
169 let create_table_expr = &self.creator.data.task.create_table;
170 let template = build_template(create_table_expr)?;
171 Ok(CreateRequestBuilder::new(template, physical_table_id))
172 }
173
174 pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
186 let table_route = self.table_route()?.clone();
187 let request_builder = self.new_region_request_builder(None)?;
188 let guards = self
190 .creator
191 .register_opening_regions(&self.context, &table_route.region_routes)?;
192 if !guards.is_empty() {
193 self.creator.opening_regions = guards;
194 }
195 self.create_regions(&table_route.region_routes, request_builder)
196 .await
197 }
198
199 async fn create_regions(
200 &mut self,
201 region_routes: &[RegionRoute],
202 request_builder: CreateRequestBuilder,
203 ) -> Result<Status> {
204 let create_table_data = &self.creator.data;
205 let region_wal_options = self.region_wal_options()?;
207 let create_table_expr = &create_table_data.task.create_table;
208 let catalog = &create_table_expr.catalog_name;
209 let schema = &create_table_expr.schema_name;
210 let storage_path = region_storage_path(catalog, schema);
211 let leaders = find_leaders(region_routes);
212 let mut create_region_tasks = Vec::with_capacity(leaders.len());
213
214 for datanode in leaders {
215 let requester = self.context.node_manager.datanode(&datanode).await;
216
217 let regions = find_leader_regions(region_routes, &datanode);
218 let mut requests = Vec::with_capacity(regions.len());
219 for region_number in regions {
220 let region_id = RegionId::new(self.table_id(), region_number);
221 let create_region_request = request_builder.build_one(
222 region_id,
223 storage_path.clone(),
224 region_wal_options,
225 )?;
226 requests.push(PbRegionRequest::Create(create_region_request));
227 }
228
229 for request in requests {
230 let request = RegionRequest {
231 header: Some(RegionRequestHeader {
232 tracing_context: TracingContext::from_current_span().to_w3c(),
233 ..Default::default()
234 }),
235 body: Some(request),
236 };
237
238 let datanode = datanode.clone();
239 let requester = requester.clone();
240 create_region_tasks.push(async move {
241 requester
242 .handle(request)
243 .await
244 .map_err(add_peer_context_if_needed(datanode))
245 });
246 }
247 }
248
249 join_all(create_region_tasks)
250 .await
251 .into_iter()
252 .collect::<Result<Vec<_>>>()?;
253
254 self.creator.data.state = CreateTableState::CreateMetadata;
255
256 Ok(Status::executing(true))
258 }
259
260 async fn on_create_metadata(&mut self) -> Result<Status> {
265 let table_id = self.table_id();
266 let manager = &self.context.table_metadata_manager;
267
268 let raw_table_info = self.table_info().clone();
269 let region_wal_options = self.region_wal_options()?.clone();
271 let physical_table_route = self.table_route()?.clone();
273 let detecting_regions =
274 convert_region_routes_to_detecting_regions(&physical_table_route.region_routes);
275 let table_route = TableRouteValue::Physical(physical_table_route);
276 manager
277 .create_table_metadata(raw_table_info, table_route, region_wal_options)
278 .await?;
279 self.context
280 .register_failure_detectors(detecting_regions)
281 .await;
282 info!("Created table metadata for table {table_id}");
283
284 self.creator.opening_regions.clear();
285 Ok(Status::done_with_output(table_id))
286 }
287}
288
289#[async_trait]
290impl Procedure for CreateTableProcedure {
291 fn type_name(&self) -> &str {
292 Self::TYPE_NAME
293 }
294
295 fn recover(&mut self) -> ProcedureResult<()> {
296 if let Some(x) = &self.creator.data.table_route {
298 self.creator.opening_regions = self
299 .creator
300 .register_opening_regions(&self.context, &x.region_routes)
301 .map_err(BoxedError::new)
302 .context(ExternalSnafu {
303 clean_poisons: false,
304 })?;
305 }
306
307 Ok(())
308 }
309
310 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
311 let state = &self.creator.data.state;
312
313 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
314 .with_label_values(&[state.as_ref()])
315 .start_timer();
316
317 match state {
318 CreateTableState::Prepare => self.on_prepare().await,
319 CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
320 CreateTableState::CreateMetadata => self.on_create_metadata().await,
321 }
322 .map_err(handle_retry_error)
323 }
324
325 fn dump(&self) -> ProcedureResult<String> {
326 serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
327 }
328
329 fn lock_key(&self) -> LockKey {
330 let table_ref = &self.creator.data.table_ref();
331
332 LockKey::new(vec![
333 CatalogLock::Read(table_ref.catalog).into(),
334 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
335 TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
336 ])
337 }
338}
339
340pub struct TableCreator {
341 pub data: CreateTableData,
343 pub opening_regions: Vec<OperatingRegionGuard>,
345}
346
347impl TableCreator {
348 pub fn new(task: CreateTableTask) -> Self {
349 Self {
350 data: CreateTableData {
351 state: CreateTableState::Prepare,
352 task,
353 table_route: None,
354 region_wal_options: None,
355 },
356 opening_regions: vec![],
357 }
358 }
359
360 fn register_opening_regions(
362 &self,
363 context: &DdlContext,
364 region_routes: &[RegionRoute],
365 ) -> Result<Vec<OperatingRegionGuard>> {
366 let opening_regions = operating_leader_regions(region_routes);
367
368 if self.opening_regions.len() == opening_regions.len() {
369 return Ok(vec![]);
370 }
371
372 let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
373
374 for (region_id, datanode_id) in opening_regions {
375 let guard = context
376 .memory_region_keeper
377 .register(datanode_id, region_id)
378 .context(error::RegionOperatingRaceSnafu {
379 region_id,
380 peer_id: datanode_id,
381 })?;
382 opening_region_guards.push(guard);
383 }
384 Ok(opening_region_guards)
385 }
386
387 fn set_allocated_metadata(
388 &mut self,
389 table_id: TableId,
390 table_route: PhysicalTableRouteValue,
391 region_wal_options: HashMap<RegionNumber, String>,
392 ) {
393 self.data.task.table_info.ident.table_id = table_id;
394 self.data.table_route = Some(table_route);
395 self.data.region_wal_options = Some(region_wal_options);
396 }
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
400pub enum CreateTableState {
401 Prepare,
403 DatanodeCreateRegions,
405 CreateMetadata,
407}
408
409#[derive(Debug, Serialize, Deserialize)]
410pub struct CreateTableData {
411 pub state: CreateTableState,
412 pub task: CreateTableTask,
413 table_route: Option<PhysicalTableRouteValue>,
415 pub region_wal_options: Option<HashMap<RegionNumber, String>>,
417}
418
419impl CreateTableData {
420 fn table_ref(&self) -> TableReference<'_> {
421 self.task.table_ref()
422 }
423}