Skip to main content

common_meta/ddl/
create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod executor;
16pub mod template;
17
18use api::v1::CreateTableExpr;
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use common_procedure::error::{
22    ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
23};
24use common_procedure::local::DynamicKeyLockGuard;
25use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status};
26use common_telemetry::info;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::ColumnMetadata;
30use strum::AsRefStr;
31use table::metadata::{TableId, TableInfo};
32use table::table_name::TableName;
33use table::table_reference::TableReference;
34pub(crate) use template::{CreateRequestBuilder, build_template_from_raw_table_info};
35
36use crate::ddl::create_table::executor::CreateTableExecutor;
37use crate::ddl::create_table::template::build_template;
38use crate::ddl::utils::map_to_procedure_error;
39use crate::ddl::{DdlContext, TableMetadata};
40use crate::error::{self, Result};
41use crate::key::table_route::PhysicalTableRouteValue;
42use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
43use crate::metrics;
44use crate::peer::PeerAllocContext;
45use crate::region_keeper::OperatingRegionGuard;
46use crate::rpc::ddl::{CreateTableTask, QueryContext};
47use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
48use crate::wal_provider::{
49    RegionWalOptions, acquire_remote_wal_read_locks, optional_region_wal_options_serde,
50    refresh_initial_pruned_entry_ids,
51};
52
53pub struct CreateTableProcedure {
54    pub context: DdlContext,
55    /// The serializable data.
56    pub data: CreateTableData,
57    /// The guards of opening.
58    pub opening_regions: Vec<OperatingRegionGuard>,
59    /// The executor of the procedure.
60    pub executor: CreateTableExecutor,
61    /// The guards of remote WAL topic locks.
62    remote_wal_lock_guards: Vec<DynamicKeyLockGuard>,
63}
64
65fn build_executor_from_create_table_data(
66    create_table_expr: &CreateTableExpr,
67) -> Result<CreateTableExecutor> {
68    let template = build_template(create_table_expr)?;
69    let builder = CreateRequestBuilder::new(template, None);
70    let table_name = TableName::new(
71        create_table_expr.catalog_name.clone(),
72        create_table_expr.schema_name.clone(),
73        create_table_expr.table_name.clone(),
74    );
75    let executor =
76        CreateTableExecutor::new(table_name, create_table_expr.create_if_not_exists, builder);
77    Ok(executor)
78}
79
80impl CreateTableProcedure {
81    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
82
83    pub fn new(task: CreateTableTask, context: DdlContext) -> Result<Self> {
84        Self::new_with_query_context(task, QueryContext::default(), context)
85    }
86
87    pub fn new_with_query_context(
88        task: CreateTableTask,
89        query_context: QueryContext,
90        context: DdlContext,
91    ) -> Result<Self> {
92        let executor = build_executor_from_create_table_data(&task.create_table)?;
93
94        Ok(Self {
95            context,
96            data: CreateTableData::new(task, query_context),
97            opening_regions: vec![],
98            executor,
99            remote_wal_lock_guards: vec![],
100        })
101    }
102
103    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
104        let data: CreateTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
105        let create_table_expr = &data.task.create_table;
106        let executor = build_executor_from_create_table_data(create_table_expr)
107            .map_err(BoxedError::new)
108            .context(ExternalSnafu {
109                clean_poisons: false,
110            })?;
111
112        Ok(CreateTableProcedure {
113            context,
114            data,
115            opening_regions: vec![],
116            executor,
117            remote_wal_lock_guards: vec![],
118        })
119    }
120
121    fn table_info(&self) -> &TableInfo {
122        &self.data.task.table_info
123    }
124
125    pub(crate) fn table_id(&self) -> TableId {
126        self.table_info().ident.table_id
127    }
128
129    fn region_wal_options(&self) -> Result<&RegionWalOptions> {
130        self.data
131            .region_wal_options
132            .as_ref()
133            .context(error::UnexpectedSnafu {
134                err_msg: "region_wal_options is not allocated",
135            })
136    }
137
138    fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
139        self.data
140            .table_route
141            .as_ref()
142            .context(error::UnexpectedSnafu {
143                err_msg: "table_route is not allocated",
144            })
145    }
146
147    /// On the prepare step, it performs:
148    /// - Checks whether the table exists.
149    /// - Allocates the table id.
150    ///
151    /// Abort(non-retry):
152    /// - TableName exists and `create_if_not_exists` is false.
153    /// - Failed to allocate [TableMetadata].
154    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
155        let table_id = self
156            .executor
157            .on_prepare(&self.context.table_metadata_manager)
158            .await?;
159        // Return the table id if the table already exists.
160        if let Some(table_id) = table_id {
161            return Ok(Status::done_with_output(table_id));
162        }
163
164        self.data.state = CreateTableState::DatanodeCreateRegions;
165        let TableMetadata {
166            table_id,
167            table_route,
168            region_wal_options,
169        } = self
170            .context
171            .table_metadata_allocator
172            .create_with_context(
173                &self.data.task,
174                &PeerAllocContext {
175                    extensions: self.data.query_context.extensions.clone(),
176                },
177            )
178            .await?;
179        self.set_allocated_metadata(table_id, table_route, region_wal_options);
180
181        Ok(Status::executing(true))
182    }
183
184    async fn ensure_remote_wal_read_locks(&mut self, ctx: &ProcedureContext) -> Result<()> {
185        if !self.remote_wal_lock_guards.is_empty() {
186            return Ok(());
187        }
188
189        self.remote_wal_lock_guards =
190            acquire_remote_wal_read_locks(ctx, self.region_wal_options()?).await;
191
192        Ok(())
193    }
194
195    async fn refresh_initial_pruned_entry_ids(&mut self) -> Result<()> {
196        let region_wal_options =
197            self.data
198                .region_wal_options
199                .as_mut()
200                .context(error::UnexpectedSnafu {
201                    err_msg: "region_wal_options is not allocated",
202                })?;
203        refresh_initial_pruned_entry_ids(&self.context.table_metadata_manager, region_wal_options)
204            .await
205    }
206
207    /// Creates regions on datanodes
208    ///
209    /// Abort(non-retry):
210    /// - Failed to create [CreateRequestBuilder].
211    /// - Failed to get the table route of physical table (for logical table).
212    ///
213    /// Retry:
214    /// - If the underlying servers returns one of the following [Code](tonic::status::Code):
215    ///   - [Code::Cancelled](tonic::status::Code::Cancelled)
216    ///   - [Code::DeadlineExceeded](tonic::status::Code::DeadlineExceeded)
217    ///   - [Code::Unavailable](tonic::status::Code::Unavailable)
218    pub async fn on_datanode_create_regions(&mut self, retrying: bool) -> Result<Status> {
219        let mut table_route = self.table_route()?.clone();
220        if retrying {
221            info!(
222                "Remapping region routes addresses for retrying create regions for table: {}",
223                self.data.table_ref()
224            );
225            let storage = self
226                .context
227                .table_metadata_manager
228                .table_route_manager()
229                .table_route_storage();
230            // The peer addresses may change during retries,
231            // so we always remap the region routes.
232            storage
233                .remap_region_routes(&mut table_route.region_routes)
234                .await?;
235        }
236        // Registers opening regions
237        let guards = self.register_opening_regions(&self.context, &table_route.region_routes)?;
238        if !guards.is_empty() {
239            self.opening_regions = guards;
240        }
241        self.create_regions(&table_route.region_routes).await
242    }
243
244    async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
245        let table_id = self.table_id();
246        let region_wal_options = self.region_wal_options()?;
247        let column_metadatas = self
248            .executor
249            .on_create_regions(
250                &self.context.node_manager,
251                table_id,
252                region_routes,
253                region_wal_options,
254            )
255            .await?;
256
257        self.data.column_metadatas = column_metadatas;
258        self.data.state = CreateTableState::CreateMetadata;
259        Ok(Status::executing(true))
260    }
261
262    /// Creates table metadata
263    ///
264    /// Abort(not-retry):
265    /// - Failed to create table metadata.
266    async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result<Status> {
267        let table_id = self.table_id();
268        let table_ref = self.data.table_ref();
269        let manager = &self.context.table_metadata_manager;
270
271        let raw_table_info = self.table_info().clone();
272        // Safety: the region_wal_options must be allocated.
273        let region_wal_options = self.region_wal_options()?.clone();
274        // Safety: the table_route must be allocated.
275        let physical_table_route = self.table_route()?.clone();
276        self.executor
277            .on_create_metadata(
278                manager,
279                &self.context.region_failure_detector_controller,
280                raw_table_info,
281                &self.data.column_metadatas,
282                physical_table_route,
283                region_wal_options,
284            )
285            .await?;
286
287        info!(
288            "Successfully created table: {}, table_id: {}, procedure_id: {}",
289            table_ref, table_id, pid
290        );
291
292        self.opening_regions.clear();
293        self.remote_wal_lock_guards.clear();
294        Ok(Status::done_with_output(table_id))
295    }
296
297    /// Registers and returns the guards of the opening region if they don't exist.
298    fn register_opening_regions(
299        &self,
300        context: &DdlContext,
301        region_routes: &[RegionRoute],
302    ) -> Result<Vec<OperatingRegionGuard>> {
303        let opening_regions = operating_leader_region_roles(region_routes);
304        if self.opening_regions.len() == opening_regions.len() {
305            return Ok(vec![]);
306        }
307
308        let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
309
310        for (region_id, datanode_id, role) in opening_regions {
311            let guard = context
312                .memory_region_keeper
313                .register_with_role(datanode_id, region_id, role)
314                .context(error::RegionOperatingRaceSnafu {
315                    region_id,
316                    peer_id: datanode_id,
317                })?;
318            opening_region_guards.push(guard);
319        }
320        Ok(opening_region_guards)
321    }
322
323    pub fn set_allocated_metadata(
324        &mut self,
325        table_id: TableId,
326        table_route: PhysicalTableRouteValue,
327        region_wal_options: RegionWalOptions,
328    ) {
329        self.data.task.table_info.ident.table_id = table_id;
330        self.data.table_route = Some(table_route);
331        self.data.region_wal_options = Some(region_wal_options);
332    }
333}
334
335#[async_trait]
336impl Procedure for CreateTableProcedure {
337    fn type_name(&self) -> &str {
338        Self::TYPE_NAME
339    }
340
341    fn recover(&mut self) -> ProcedureResult<()> {
342        // Only registers regions if the table route is allocated.
343        if let Some(x) = &self.data.table_route {
344            self.opening_regions = self
345                .register_opening_regions(&self.context, &x.region_routes)
346                .map_err(BoxedError::new)
347                .context(ExternalSnafu {
348                    clean_poisons: false,
349                })?;
350        }
351
352        Ok(())
353    }
354
355    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
356        let state = &self.data.state;
357
358        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
359            .with_label_values(&[state.as_ref()])
360            .start_timer();
361
362        match state {
363            CreateTableState::Prepare => self.on_prepare().await,
364            CreateTableState::DatanodeCreateRegions => {
365                async {
366                    self.ensure_remote_wal_read_locks(ctx).await?;
367                    self.refresh_initial_pruned_entry_ids().await?;
368                    let retrying = ctx.is_retrying().await.unwrap_or(false);
369                    self.on_datanode_create_regions(retrying).await
370                }
371                .await
372            }
373            CreateTableState::CreateMetadata => {
374                async {
375                    self.ensure_remote_wal_read_locks(ctx).await?;
376                    self.on_create_metadata(ctx.procedure_id).await
377                }
378                .await
379            }
380        }
381        .map_err(map_to_procedure_error)
382    }
383
384    fn dump(&self) -> ProcedureResult<String> {
385        serde_json::to_string(&self.data).context(ToJsonSnafu)
386    }
387
388    fn lock_key(&self) -> LockKey {
389        let table_ref = &self.data.table_ref();
390
391        LockKey::new(vec![
392            CatalogLock::Read(table_ref.catalog).into(),
393            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
394            TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
395        ])
396    }
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
400pub enum CreateTableState {
401    /// Prepares to create the table
402    Prepare,
403    /// Creates regions on the Datanode
404    DatanodeCreateRegions,
405    /// Creates metadata
406    CreateMetadata,
407}
408
409#[derive(Debug, Serialize, Deserialize)]
410pub struct CreateTableData {
411    pub state: CreateTableState,
412    pub task: CreateTableTask,
413    #[serde(default)]
414    pub query_context: QueryContext,
415    #[serde(default)]
416    pub column_metadatas: Vec<ColumnMetadata>,
417    /// None stands for not allocated yet.
418    pub(crate) table_route: Option<PhysicalTableRouteValue>,
419    /// None stands for not allocated yet.
420    #[serde(default)]
421    #[serde(with = "optional_region_wal_options_serde")]
422    pub region_wal_options: Option<RegionWalOptions>,
423}
424
425impl CreateTableData {
426    pub fn new(task: CreateTableTask, query_context: QueryContext) -> Self {
427        CreateTableData {
428            state: CreateTableState::Prepare,
429            column_metadatas: vec![],
430            task,
431            query_context,
432            table_route: None,
433            region_wal_options: None,
434        }
435    }
436
437    fn table_ref(&self) -> TableReference<'_> {
438        self.task.table_ref()
439    }
440}