Skip to main content

common_meta/ddl/
create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod executor;
16pub mod template;
17
18use std::collections::HashMap;
19
20use api::v1::CreateTableExpr;
21use async_trait::async_trait;
22use common_error::ext::BoxedError;
23use common_procedure::error::{
24    ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
25};
26use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status};
27use common_telemetry::info;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::storage::RegionNumber;
32use strum::AsRefStr;
33use table::metadata::{TableId, TableInfo};
34use table::table_name::TableName;
35use table::table_reference::TableReference;
36pub(crate) use template::{CreateRequestBuilder, build_template_from_raw_table_info};
37
38use crate::ddl::create_table::executor::CreateTableExecutor;
39use crate::ddl::create_table::template::build_template;
40use crate::ddl::utils::map_to_procedure_error;
41use crate::ddl::{DdlContext, TableMetadata};
42use crate::error::{self, Result};
43use crate::key::table_route::PhysicalTableRouteValue;
44use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
45use crate::metrics;
46use crate::peer::PeerAllocContext;
47use crate::region_keeper::OperatingRegionGuard;
48use crate::rpc::ddl::{CreateTableTask, QueryContext};
49use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
50
51pub struct CreateTableProcedure {
52    pub context: DdlContext,
53    /// The serializable data.
54    pub data: CreateTableData,
55    /// The guards of opening.
56    pub opening_regions: Vec<OperatingRegionGuard>,
57    /// The executor of the procedure.
58    pub executor: CreateTableExecutor,
59}
60
61fn build_executor_from_create_table_data(
62    create_table_expr: &CreateTableExpr,
63) -> Result<CreateTableExecutor> {
64    let template = build_template(create_table_expr)?;
65    let builder = CreateRequestBuilder::new(template, None);
66    let table_name = TableName::new(
67        create_table_expr.catalog_name.clone(),
68        create_table_expr.schema_name.clone(),
69        create_table_expr.table_name.clone(),
70    );
71    let executor =
72        CreateTableExecutor::new(table_name, create_table_expr.create_if_not_exists, builder);
73    Ok(executor)
74}
75
76impl CreateTableProcedure {
77    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
78
79    pub fn new(task: CreateTableTask, context: DdlContext) -> Result<Self> {
80        Self::new_with_query_context(task, QueryContext::default(), context)
81    }
82
83    pub fn new_with_query_context(
84        task: CreateTableTask,
85        query_context: QueryContext,
86        context: DdlContext,
87    ) -> Result<Self> {
88        let executor = build_executor_from_create_table_data(&task.create_table)?;
89
90        Ok(Self {
91            context,
92            data: CreateTableData::new(task, query_context),
93            opening_regions: vec![],
94            executor,
95        })
96    }
97
98    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
99        let data: CreateTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
100        let create_table_expr = &data.task.create_table;
101        let executor = build_executor_from_create_table_data(create_table_expr)
102            .map_err(BoxedError::new)
103            .context(ExternalSnafu {
104                clean_poisons: false,
105            })?;
106
107        Ok(CreateTableProcedure {
108            context,
109            data,
110            opening_regions: vec![],
111            executor,
112        })
113    }
114
115    fn table_info(&self) -> &TableInfo {
116        &self.data.task.table_info
117    }
118
119    pub(crate) fn table_id(&self) -> TableId {
120        self.table_info().ident.table_id
121    }
122
123    fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
124        self.data
125            .region_wal_options
126            .as_ref()
127            .context(error::UnexpectedSnafu {
128                err_msg: "region_wal_options is not allocated",
129            })
130    }
131
132    fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
133        self.data
134            .table_route
135            .as_ref()
136            .context(error::UnexpectedSnafu {
137                err_msg: "table_route is not allocated",
138            })
139    }
140
141    /// On the prepare step, it performs:
142    /// - Checks whether the table exists.
143    /// - Allocates the table id.
144    ///
145    /// Abort(non-retry):
146    /// - TableName exists and `create_if_not_exists` is false.
147    /// - Failed to allocate [TableMetadata].
148    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
149        let table_id = self
150            .executor
151            .on_prepare(&self.context.table_metadata_manager)
152            .await?;
153        // Return the table id if the table already exists.
154        if let Some(table_id) = table_id {
155            return Ok(Status::done_with_output(table_id));
156        }
157
158        self.data.state = CreateTableState::DatanodeCreateRegions;
159        let TableMetadata {
160            table_id,
161            table_route,
162            region_wal_options,
163        } = self
164            .context
165            .table_metadata_allocator
166            .create_with_context(
167                &self.data.task,
168                &PeerAllocContext {
169                    extensions: self.data.query_context.extensions.clone(),
170                },
171            )
172            .await?;
173        self.set_allocated_metadata(table_id, table_route, region_wal_options);
174
175        Ok(Status::executing(true))
176    }
177
178    /// Creates regions on datanodes
179    ///
180    /// Abort(non-retry):
181    /// - Failed to create [CreateRequestBuilder].
182    /// - Failed to get the table route of physical table (for logical table).
183    ///
184    /// Retry:
185    /// - If the underlying servers returns one of the following [Code](tonic::status::Code):
186    ///   - [Code::Cancelled](tonic::status::Code::Cancelled)
187    ///   - [Code::DeadlineExceeded](tonic::status::Code::DeadlineExceeded)
188    ///   - [Code::Unavailable](tonic::status::Code::Unavailable)
189    pub async fn on_datanode_create_regions(&mut self, retrying: bool) -> Result<Status> {
190        let mut table_route = self.table_route()?.clone();
191        if retrying {
192            info!(
193                "Remapping region routes addresses for retrying create regions for table: {}",
194                self.data.table_ref()
195            );
196            let storage = self
197                .context
198                .table_metadata_manager
199                .table_route_manager()
200                .table_route_storage();
201            // The peer addresses may change during retries,
202            // so we always remap the region routes.
203            storage
204                .remap_region_routes(&mut table_route.region_routes)
205                .await?;
206        }
207        // Registers opening regions
208        let guards = self.register_opening_regions(&self.context, &table_route.region_routes)?;
209        if !guards.is_empty() {
210            self.opening_regions = guards;
211        }
212        self.create_regions(&table_route.region_routes).await
213    }
214
215    async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
216        let table_id = self.table_id();
217        let region_wal_options = self.region_wal_options()?;
218        let column_metadatas = self
219            .executor
220            .on_create_regions(
221                &self.context.node_manager,
222                table_id,
223                region_routes,
224                region_wal_options,
225            )
226            .await?;
227
228        self.data.column_metadatas = column_metadatas;
229        self.data.state = CreateTableState::CreateMetadata;
230        Ok(Status::executing(true))
231    }
232
233    /// Creates table metadata
234    ///
235    /// Abort(not-retry):
236    /// - Failed to create table metadata.
237    async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result<Status> {
238        let table_id = self.table_id();
239        let table_ref = self.data.table_ref();
240        let manager = &self.context.table_metadata_manager;
241
242        let raw_table_info = self.table_info().clone();
243        // Safety: the region_wal_options must be allocated.
244        let region_wal_options = self.region_wal_options()?.clone();
245        // Safety: the table_route must be allocated.
246        let physical_table_route = self.table_route()?.clone();
247        self.executor
248            .on_create_metadata(
249                manager,
250                &self.context.region_failure_detector_controller,
251                raw_table_info,
252                &self.data.column_metadatas,
253                physical_table_route,
254                region_wal_options,
255            )
256            .await?;
257
258        info!(
259            "Successfully created table: {}, table_id: {}, procedure_id: {}",
260            table_ref, table_id, pid
261        );
262
263        self.opening_regions.clear();
264        Ok(Status::done_with_output(table_id))
265    }
266
267    /// Registers and returns the guards of the opening region if they don't exist.
268    fn register_opening_regions(
269        &self,
270        context: &DdlContext,
271        region_routes: &[RegionRoute],
272    ) -> Result<Vec<OperatingRegionGuard>> {
273        let opening_regions = operating_leader_region_roles(region_routes);
274        if self.opening_regions.len() == opening_regions.len() {
275            return Ok(vec![]);
276        }
277
278        let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
279
280        for (region_id, datanode_id, role) in opening_regions {
281            let guard = context
282                .memory_region_keeper
283                .register_with_role(datanode_id, region_id, role)
284                .context(error::RegionOperatingRaceSnafu {
285                    region_id,
286                    peer_id: datanode_id,
287                })?;
288            opening_region_guards.push(guard);
289        }
290        Ok(opening_region_guards)
291    }
292
293    pub fn set_allocated_metadata(
294        &mut self,
295        table_id: TableId,
296        table_route: PhysicalTableRouteValue,
297        region_wal_options: HashMap<RegionNumber, String>,
298    ) {
299        self.data.task.table_info.ident.table_id = table_id;
300        self.data.table_route = Some(table_route);
301        self.data.region_wal_options = Some(region_wal_options);
302    }
303}
304
305#[async_trait]
306impl Procedure for CreateTableProcedure {
307    fn type_name(&self) -> &str {
308        Self::TYPE_NAME
309    }
310
311    fn recover(&mut self) -> ProcedureResult<()> {
312        // Only registers regions if the table route is allocated.
313        if let Some(x) = &self.data.table_route {
314            self.opening_regions = self
315                .register_opening_regions(&self.context, &x.region_routes)
316                .map_err(BoxedError::new)
317                .context(ExternalSnafu {
318                    clean_poisons: false,
319                })?;
320        }
321
322        Ok(())
323    }
324
325    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
326        let state = &self.data.state;
327
328        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
329            .with_label_values(&[state.as_ref()])
330            .start_timer();
331
332        match state {
333            CreateTableState::Prepare => self.on_prepare().await,
334            CreateTableState::DatanodeCreateRegions => {
335                let retrying = ctx.is_retrying().await.unwrap_or(false);
336                self.on_datanode_create_regions(retrying).await
337            }
338            CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
339        }
340        .map_err(map_to_procedure_error)
341    }
342
343    fn dump(&self) -> ProcedureResult<String> {
344        serde_json::to_string(&self.data).context(ToJsonSnafu)
345    }
346
347    fn lock_key(&self) -> LockKey {
348        let table_ref = &self.data.table_ref();
349
350        LockKey::new(vec![
351            CatalogLock::Read(table_ref.catalog).into(),
352            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
353            TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
354        ])
355    }
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
359pub enum CreateTableState {
360    /// Prepares to create the table
361    Prepare,
362    /// Creates regions on the Datanode
363    DatanodeCreateRegions,
364    /// Creates metadata
365    CreateMetadata,
366}
367
368#[derive(Debug, Serialize, Deserialize)]
369pub struct CreateTableData {
370    pub state: CreateTableState,
371    pub task: CreateTableTask,
372    #[serde(default)]
373    pub query_context: QueryContext,
374    #[serde(default)]
375    pub column_metadatas: Vec<ColumnMetadata>,
376    /// None stands for not allocated yet.
377    pub(crate) table_route: Option<PhysicalTableRouteValue>,
378    /// None stands for not allocated yet.
379    pub region_wal_options: Option<HashMap<RegionNumber, String>>,
380}
381
382impl CreateTableData {
383    pub fn new(task: CreateTableTask, query_context: QueryContext) -> Self {
384        CreateTableData {
385            state: CreateTableState::Prepare,
386            column_metadatas: vec![],
387            task,
388            query_context,
389            table_route: None,
390            region_wal_options: None,
391        }
392    }
393
394    fn table_ref(&self) -> TableReference<'_> {
395        self.task.table_ref()
396    }
397}