1use std::collections::HashMap;
16
17use api::v1::region::region_request::Body as PbRegionRequest;
18use api::v1::region::{RegionRequest, RegionRequestHeader};
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use common_procedure::error::{
22 ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
23};
24use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status};
25use common_telemetry::tracing_context::TracingContext;
26use common_telemetry::{info, warn};
27use futures::future::join_all;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt, ensure};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
32use store_api::storage::{RegionId, RegionNumber};
33use strum::AsRefStr;
34use table::metadata::{RawTableInfo, TableId};
35use table::table_reference::TableReference;
36
37use crate::ddl::create_table_template::{CreateRequestBuilder, build_template};
38use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
39use crate::ddl::utils::{
40 add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
41 extract_column_metadatas, map_to_procedure_error, region_storage_path,
42};
43use crate::ddl::{DdlContext, TableMetadata};
44use crate::error::{self, Result};
45use crate::key::table_name::TableNameKey;
46use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
47use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
48use crate::metrics;
49use crate::region_keeper::OperatingRegionGuard;
50use crate::rpc::ddl::CreateTableTask;
51use crate::rpc::router::{
52 RegionRoute, find_leader_regions, find_leaders, operating_leader_regions,
53};
54pub struct CreateTableProcedure {
55 pub context: DdlContext,
56 pub creator: TableCreator,
57}
58
59impl CreateTableProcedure {
60 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
61
62 pub fn new(task: CreateTableTask, context: DdlContext) -> Self {
63 Self {
64 context,
65 creator: TableCreator::new(task),
66 }
67 }
68
69 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
70 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
71
72 Ok(CreateTableProcedure {
73 context,
74 creator: TableCreator {
75 data,
76 opening_regions: vec![],
77 },
78 })
79 }
80
81 fn table_info(&self) -> &RawTableInfo {
82 &self.creator.data.task.table_info
83 }
84
85 pub(crate) fn table_id(&self) -> TableId {
86 self.table_info().ident.table_id
87 }
88
89 fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
90 self.creator
91 .data
92 .region_wal_options
93 .as_ref()
94 .context(error::UnexpectedSnafu {
95 err_msg: "region_wal_options is not allocated",
96 })
97 }
98
99 fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
100 self.creator
101 .data
102 .table_route
103 .as_ref()
104 .context(error::UnexpectedSnafu {
105 err_msg: "table_route is not allocated",
106 })
107 }
108
109 #[cfg(any(test, feature = "testing"))]
110 pub fn set_allocated_metadata(
111 &mut self,
112 table_id: TableId,
113 table_route: PhysicalTableRouteValue,
114 region_wal_options: HashMap<RegionNumber, String>,
115 ) {
116 self.creator
117 .set_allocated_metadata(table_id, table_route, region_wal_options)
118 }
119
120 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
128 let expr = &self.creator.data.task.create_table;
129 let table_name_value = self
130 .context
131 .table_metadata_manager
132 .table_name_manager()
133 .get(TableNameKey::new(
134 &expr.catalog_name,
135 &expr.schema_name,
136 &expr.table_name,
137 ))
138 .await?;
139
140 if let Some(value) = table_name_value {
141 ensure!(
142 expr.create_if_not_exists,
143 error::TableAlreadyExistsSnafu {
144 table_name: self.creator.data.table_ref().to_string(),
145 }
146 );
147
148 let table_id = value.table_id();
149 return Ok(Status::done_with_output(table_id));
150 }
151
152 self.creator.data.state = CreateTableState::DatanodeCreateRegions;
153 let TableMetadata {
154 table_id,
155 table_route,
156 region_wal_options,
157 } = self
158 .context
159 .table_metadata_allocator
160 .create(&self.creator.data.task)
161 .await?;
162 self.creator
163 .set_allocated_metadata(table_id, table_route, region_wal_options);
164
165 Ok(Status::executing(true))
166 }
167
168 pub fn new_region_request_builder(
169 &self,
170 physical_table_id: Option<TableId>,
171 ) -> Result<CreateRequestBuilder> {
172 let create_table_expr = &self.creator.data.task.create_table;
173 let template = build_template(create_table_expr)?;
174 Ok(CreateRequestBuilder::new(template, physical_table_id))
175 }
176
177 pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
189 let table_route = self.table_route()?.clone();
190 let request_builder = self.new_region_request_builder(None)?;
191 let guards = self
193 .creator
194 .register_opening_regions(&self.context, &table_route.region_routes)?;
195 if !guards.is_empty() {
196 self.creator.opening_regions = guards;
197 }
198 self.create_regions(&table_route.region_routes, request_builder)
199 .await
200 }
201
202 async fn create_regions(
203 &mut self,
204 region_routes: &[RegionRoute],
205 request_builder: CreateRequestBuilder,
206 ) -> Result<Status> {
207 let create_table_data = &self.creator.data;
208 let region_wal_options = self.region_wal_options()?;
210 let create_table_expr = &create_table_data.task.create_table;
211 let catalog = &create_table_expr.catalog_name;
212 let schema = &create_table_expr.schema_name;
213 let storage_path = region_storage_path(catalog, schema);
214 let leaders = find_leaders(region_routes);
215 let mut create_region_tasks = Vec::with_capacity(leaders.len());
216
217 let partition_exprs = region_routes
218 .iter()
219 .map(|r| (r.region.id.region_number(), r.region.partition_expr()))
220 .collect();
221
222 for datanode in leaders {
223 let requester = self.context.node_manager.datanode(&datanode).await;
224
225 let regions = find_leader_regions(region_routes, &datanode);
226 let mut requests = Vec::with_capacity(regions.len());
227 for region_number in regions {
228 let region_id = RegionId::new(self.table_id(), region_number);
229 let create_region_request = request_builder.build_one(
230 region_id,
231 storage_path.clone(),
232 region_wal_options,
233 &partition_exprs,
234 );
235 requests.push(PbRegionRequest::Create(create_region_request));
236 }
237
238 for request in requests {
239 let request = RegionRequest {
240 header: Some(RegionRequestHeader {
241 tracing_context: TracingContext::from_current_span().to_w3c(),
242 ..Default::default()
243 }),
244 body: Some(request),
245 };
246
247 let datanode = datanode.clone();
248 let requester = requester.clone();
249 create_region_tasks.push(async move {
250 requester
251 .handle(request)
252 .await
253 .map_err(add_peer_context_if_needed(datanode))
254 });
255 }
256 }
257
258 let mut results = join_all(create_region_tasks)
259 .await
260 .into_iter()
261 .collect::<Result<Vec<_>>>()?;
262
263 if let Some(column_metadatas) =
264 extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
265 {
266 self.creator.data.column_metadatas = column_metadatas;
267 } else {
268 warn!(
269 "creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
270 );
271 }
272
273 self.creator.data.state = CreateTableState::CreateMetadata;
274 Ok(Status::executing(true))
275 }
276
277 async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result<Status> {
282 let table_id = self.table_id();
283 let table_ref = self.creator.data.table_ref();
284 let manager = &self.context.table_metadata_manager;
285
286 let mut raw_table_info = self.table_info().clone();
287 if !self.creator.data.column_metadatas.is_empty() {
288 update_table_info_column_ids(&mut raw_table_info, &self.creator.data.column_metadatas);
289 }
290 let region_wal_options = self.region_wal_options()?.clone();
292 let physical_table_route = self.table_route()?.clone();
294 let detecting_regions =
295 convert_region_routes_to_detecting_regions(&physical_table_route.region_routes);
296 let table_route = TableRouteValue::Physical(physical_table_route);
297 manager
298 .create_table_metadata(raw_table_info, table_route, region_wal_options)
299 .await?;
300 self.context
301 .register_failure_detectors(detecting_regions)
302 .await;
303 info!(
304 "Successfully created table: {}, table_id: {}, procedure_id: {}",
305 table_ref, table_id, pid
306 );
307
308 self.creator.opening_regions.clear();
309 Ok(Status::done_with_output(table_id))
310 }
311}
312
313#[async_trait]
314impl Procedure for CreateTableProcedure {
315 fn type_name(&self) -> &str {
316 Self::TYPE_NAME
317 }
318
319 fn recover(&mut self) -> ProcedureResult<()> {
320 if let Some(x) = &self.creator.data.table_route {
322 self.creator.opening_regions = self
323 .creator
324 .register_opening_regions(&self.context, &x.region_routes)
325 .map_err(BoxedError::new)
326 .context(ExternalSnafu {
327 clean_poisons: false,
328 })?;
329 }
330
331 Ok(())
332 }
333
334 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
335 let state = &self.creator.data.state;
336
337 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
338 .with_label_values(&[state.as_ref()])
339 .start_timer();
340
341 match state {
342 CreateTableState::Prepare => self.on_prepare().await,
343 CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
344 CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
345 }
346 .map_err(map_to_procedure_error)
347 }
348
349 fn dump(&self) -> ProcedureResult<String> {
350 serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
351 }
352
353 fn lock_key(&self) -> LockKey {
354 let table_ref = &self.creator.data.table_ref();
355
356 LockKey::new(vec![
357 CatalogLock::Read(table_ref.catalog).into(),
358 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
359 TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
360 ])
361 }
362}
363
364pub struct TableCreator {
365 pub data: CreateTableData,
367 pub opening_regions: Vec<OperatingRegionGuard>,
369}
370
371impl TableCreator {
372 pub fn new(task: CreateTableTask) -> Self {
373 Self {
374 data: CreateTableData {
375 state: CreateTableState::Prepare,
376 column_metadatas: vec![],
377 task,
378 table_route: None,
379 region_wal_options: None,
380 },
381 opening_regions: vec![],
382 }
383 }
384
385 fn register_opening_regions(
387 &self,
388 context: &DdlContext,
389 region_routes: &[RegionRoute],
390 ) -> Result<Vec<OperatingRegionGuard>> {
391 let opening_regions = operating_leader_regions(region_routes);
392
393 if self.opening_regions.len() == opening_regions.len() {
394 return Ok(vec![]);
395 }
396
397 let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
398
399 for (region_id, datanode_id) in opening_regions {
400 let guard = context
401 .memory_region_keeper
402 .register(datanode_id, region_id)
403 .context(error::RegionOperatingRaceSnafu {
404 region_id,
405 peer_id: datanode_id,
406 })?;
407 opening_region_guards.push(guard);
408 }
409 Ok(opening_region_guards)
410 }
411
412 fn set_allocated_metadata(
413 &mut self,
414 table_id: TableId,
415 table_route: PhysicalTableRouteValue,
416 region_wal_options: HashMap<RegionNumber, String>,
417 ) {
418 self.data.task.table_info.ident.table_id = table_id;
419 self.data.table_route = Some(table_route);
420 self.data.region_wal_options = Some(region_wal_options);
421 }
422}
423
424#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
425pub enum CreateTableState {
426 Prepare,
428 DatanodeCreateRegions,
430 CreateMetadata,
432}
433
434#[derive(Debug, Serialize, Deserialize)]
435pub struct CreateTableData {
436 pub state: CreateTableState,
437 pub task: CreateTableTask,
438 #[serde(default)]
439 pub column_metadatas: Vec<ColumnMetadata>,
440 table_route: Option<PhysicalTableRouteValue>,
442 pub region_wal_options: Option<HashMap<RegionNumber, String>>,
444}
445
446impl CreateTableData {
447 fn table_ref(&self) -> TableReference<'_> {
448 self.task.table_ref()
449 }
450}