Skip to main content

common_meta/ddl/
create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod executor;
16pub mod template;
17
18use std::collections::HashMap;
19
20use api::v1::CreateTableExpr;
21use async_trait::async_trait;
22use common_error::ext::BoxedError;
23use common_procedure::error::{
24    ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
25};
26use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status};
27use common_telemetry::info;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::storage::RegionNumber;
32use strum::AsRefStr;
33use table::metadata::{TableId, TableInfo};
34use table::table_name::TableName;
35use table::table_reference::TableReference;
36pub(crate) use template::{CreateRequestBuilder, build_template_from_raw_table_info};
37
38use crate::ddl::create_table::executor::CreateTableExecutor;
39use crate::ddl::create_table::template::build_template;
40use crate::ddl::utils::map_to_procedure_error;
41use crate::ddl::{DdlContext, TableMetadata};
42use crate::error::{self, Result};
43use crate::key::table_route::PhysicalTableRouteValue;
44use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
45use crate::metrics;
46use crate::region_keeper::OperatingRegionGuard;
47use crate::rpc::ddl::CreateTableTask;
48use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
49
50pub struct CreateTableProcedure {
51    pub context: DdlContext,
52    /// The serializable data.
53    pub data: CreateTableData,
54    /// The guards of opening.
55    pub opening_regions: Vec<OperatingRegionGuard>,
56    /// The executor of the procedure.
57    pub executor: CreateTableExecutor,
58}
59
60fn build_executor_from_create_table_data(
61    create_table_expr: &CreateTableExpr,
62) -> Result<CreateTableExecutor> {
63    let template = build_template(create_table_expr)?;
64    let builder = CreateRequestBuilder::new(template, None);
65    let table_name = TableName::new(
66        create_table_expr.catalog_name.clone(),
67        create_table_expr.schema_name.clone(),
68        create_table_expr.table_name.clone(),
69    );
70    let executor =
71        CreateTableExecutor::new(table_name, create_table_expr.create_if_not_exists, builder);
72    Ok(executor)
73}
74
75impl CreateTableProcedure {
76    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
77
78    pub fn new(task: CreateTableTask, context: DdlContext) -> Result<Self> {
79        let executor = build_executor_from_create_table_data(&task.create_table)?;
80
81        Ok(Self {
82            context,
83            data: CreateTableData::new(task),
84            opening_regions: vec![],
85            executor,
86        })
87    }
88
89    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
90        let data: CreateTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
91        let create_table_expr = &data.task.create_table;
92        let executor = build_executor_from_create_table_data(create_table_expr)
93            .map_err(BoxedError::new)
94            .context(ExternalSnafu {
95                clean_poisons: false,
96            })?;
97
98        Ok(CreateTableProcedure {
99            context,
100            data,
101            opening_regions: vec![],
102            executor,
103        })
104    }
105
106    fn table_info(&self) -> &TableInfo {
107        &self.data.task.table_info
108    }
109
110    pub(crate) fn table_id(&self) -> TableId {
111        self.table_info().ident.table_id
112    }
113
114    fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
115        self.data
116            .region_wal_options
117            .as_ref()
118            .context(error::UnexpectedSnafu {
119                err_msg: "region_wal_options is not allocated",
120            })
121    }
122
123    fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
124        self.data
125            .table_route
126            .as_ref()
127            .context(error::UnexpectedSnafu {
128                err_msg: "table_route is not allocated",
129            })
130    }
131
132    /// On the prepare step, it performs:
133    /// - Checks whether the table exists.
134    /// - Allocates the table id.
135    ///
136    /// Abort(non-retry):
137    /// - TableName exists and `create_if_not_exists` is false.
138    /// - Failed to allocate [TableMetadata].
139    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
140        let table_id = self
141            .executor
142            .on_prepare(&self.context.table_metadata_manager)
143            .await?;
144        // Return the table id if the table already exists.
145        if let Some(table_id) = table_id {
146            return Ok(Status::done_with_output(table_id));
147        }
148
149        self.data.state = CreateTableState::DatanodeCreateRegions;
150        let TableMetadata {
151            table_id,
152            table_route,
153            region_wal_options,
154        } = self
155            .context
156            .table_metadata_allocator
157            .create(&self.data.task)
158            .await?;
159        self.set_allocated_metadata(table_id, table_route, region_wal_options);
160
161        Ok(Status::executing(true))
162    }
163
164    /// Creates regions on datanodes
165    ///
166    /// Abort(non-retry):
167    /// - Failed to create [CreateRequestBuilder].
168    /// - Failed to get the table route of physical table (for logical table).
169    ///
170    /// Retry:
171    /// - If the underlying servers returns one of the following [Code](tonic::status::Code):
172    ///   - [Code::Cancelled](tonic::status::Code::Cancelled)
173    ///   - [Code::DeadlineExceeded](tonic::status::Code::DeadlineExceeded)
174    ///   - [Code::Unavailable](tonic::status::Code::Unavailable)
175    pub async fn on_datanode_create_regions(&mut self, retrying: bool) -> Result<Status> {
176        let mut table_route = self.table_route()?.clone();
177        if retrying {
178            info!(
179                "Remapping region routes addresses for retrying create regions for table: {}",
180                self.data.table_ref()
181            );
182            let storage = self
183                .context
184                .table_metadata_manager
185                .table_route_manager()
186                .table_route_storage();
187            // The peer addresses may change during retries,
188            // so we always remap the region routes.
189            storage
190                .remap_region_routes(&mut table_route.region_routes)
191                .await?;
192        }
193        // Registers opening regions
194        let guards = self.register_opening_regions(&self.context, &table_route.region_routes)?;
195        if !guards.is_empty() {
196            self.opening_regions = guards;
197        }
198        self.create_regions(&table_route.region_routes).await
199    }
200
201    async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
202        let table_id = self.table_id();
203        let region_wal_options = self.region_wal_options()?;
204        let column_metadatas = self
205            .executor
206            .on_create_regions(
207                &self.context.node_manager,
208                table_id,
209                region_routes,
210                region_wal_options,
211            )
212            .await?;
213
214        self.data.column_metadatas = column_metadatas;
215        self.data.state = CreateTableState::CreateMetadata;
216        Ok(Status::executing(true))
217    }
218
219    /// Creates table metadata
220    ///
221    /// Abort(not-retry):
222    /// - Failed to create table metadata.
223    async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result<Status> {
224        let table_id = self.table_id();
225        let table_ref = self.data.table_ref();
226        let manager = &self.context.table_metadata_manager;
227
228        let raw_table_info = self.table_info().clone();
229        // Safety: the region_wal_options must be allocated.
230        let region_wal_options = self.region_wal_options()?.clone();
231        // Safety: the table_route must be allocated.
232        let physical_table_route = self.table_route()?.clone();
233        self.executor
234            .on_create_metadata(
235                manager,
236                &self.context.region_failure_detector_controller,
237                raw_table_info,
238                &self.data.column_metadatas,
239                physical_table_route,
240                region_wal_options,
241            )
242            .await?;
243
244        info!(
245            "Successfully created table: {}, table_id: {}, procedure_id: {}",
246            table_ref, table_id, pid
247        );
248
249        self.opening_regions.clear();
250        Ok(Status::done_with_output(table_id))
251    }
252
253    /// Registers and returns the guards of the opening region if they don't exist.
254    fn register_opening_regions(
255        &self,
256        context: &DdlContext,
257        region_routes: &[RegionRoute],
258    ) -> Result<Vec<OperatingRegionGuard>> {
259        let opening_regions = operating_leader_region_roles(region_routes);
260        if self.opening_regions.len() == opening_regions.len() {
261            return Ok(vec![]);
262        }
263
264        let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
265
266        for (region_id, datanode_id, role) in opening_regions {
267            let guard = context
268                .memory_region_keeper
269                .register_with_role(datanode_id, region_id, role)
270                .context(error::RegionOperatingRaceSnafu {
271                    region_id,
272                    peer_id: datanode_id,
273                })?;
274            opening_region_guards.push(guard);
275        }
276        Ok(opening_region_guards)
277    }
278
279    pub fn set_allocated_metadata(
280        &mut self,
281        table_id: TableId,
282        table_route: PhysicalTableRouteValue,
283        region_wal_options: HashMap<RegionNumber, String>,
284    ) {
285        self.data.task.table_info.ident.table_id = table_id;
286        self.data.table_route = Some(table_route);
287        self.data.region_wal_options = Some(region_wal_options);
288    }
289}
290
291#[async_trait]
292impl Procedure for CreateTableProcedure {
293    fn type_name(&self) -> &str {
294        Self::TYPE_NAME
295    }
296
297    fn recover(&mut self) -> ProcedureResult<()> {
298        // Only registers regions if the table route is allocated.
299        if let Some(x) = &self.data.table_route {
300            self.opening_regions = self
301                .register_opening_regions(&self.context, &x.region_routes)
302                .map_err(BoxedError::new)
303                .context(ExternalSnafu {
304                    clean_poisons: false,
305                })?;
306        }
307
308        Ok(())
309    }
310
311    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
312        let state = &self.data.state;
313
314        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
315            .with_label_values(&[state.as_ref()])
316            .start_timer();
317
318        match state {
319            CreateTableState::Prepare => self.on_prepare().await,
320            CreateTableState::DatanodeCreateRegions => {
321                let retrying = ctx.is_retrying().await.unwrap_or(false);
322                self.on_datanode_create_regions(retrying).await
323            }
324            CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
325        }
326        .map_err(map_to_procedure_error)
327    }
328
329    fn dump(&self) -> ProcedureResult<String> {
330        serde_json::to_string(&self.data).context(ToJsonSnafu)
331    }
332
333    fn lock_key(&self) -> LockKey {
334        let table_ref = &self.data.table_ref();
335
336        LockKey::new(vec![
337            CatalogLock::Read(table_ref.catalog).into(),
338            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
339            TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
340        ])
341    }
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
345pub enum CreateTableState {
346    /// Prepares to create the table
347    Prepare,
348    /// Creates regions on the Datanode
349    DatanodeCreateRegions,
350    /// Creates metadata
351    CreateMetadata,
352}
353
354#[derive(Debug, Serialize, Deserialize)]
355pub struct CreateTableData {
356    pub state: CreateTableState,
357    pub task: CreateTableTask,
358    #[serde(default)]
359    pub column_metadatas: Vec<ColumnMetadata>,
360    /// None stands for not allocated yet.
361    pub(crate) table_route: Option<PhysicalTableRouteValue>,
362    /// None stands for not allocated yet.
363    pub region_wal_options: Option<HashMap<RegionNumber, String>>,
364}
365
366impl CreateTableData {
367    pub fn new(task: CreateTableTask) -> Self {
368        CreateTableData {
369            state: CreateTableState::Prepare,
370            column_metadatas: vec![],
371            task,
372            table_route: None,
373            region_wal_options: None,
374        }
375    }
376
377    fn table_ref(&self) -> TableReference<'_> {
378        self.task.table_ref()
379    }
380}