common_meta/ddl/
create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::region::region_request::Body as PbRegionRequest;
18use api::v1::region::{RegionRequest, RegionRequestHeader};
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use common_procedure::error::{
22    ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
23};
24use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status};
25use common_telemetry::tracing_context::TracingContext;
26use common_telemetry::{info, warn};
27use futures::future::join_all;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt, ensure};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
32use store_api::storage::{RegionId, RegionNumber};
33use strum::AsRefStr;
34use table::metadata::{RawTableInfo, TableId};
35use table::table_reference::TableReference;
36
37use crate::ddl::create_table_template::{CreateRequestBuilder, build_template};
38use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
39use crate::ddl::utils::{
40    add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
41    extract_column_metadatas, map_to_procedure_error, region_storage_path,
42};
43use crate::ddl::{DdlContext, TableMetadata};
44use crate::error::{self, Result};
45use crate::key::table_name::TableNameKey;
46use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
47use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
48use crate::metrics;
49use crate::region_keeper::OperatingRegionGuard;
50use crate::rpc::ddl::CreateTableTask;
51use crate::rpc::router::{
52    RegionRoute, find_leader_regions, find_leaders, operating_leader_regions,
53};
54pub struct CreateTableProcedure {
55    pub context: DdlContext,
56    pub creator: TableCreator,
57}
58
59impl CreateTableProcedure {
60    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
61
62    pub fn new(task: CreateTableTask, context: DdlContext) -> Self {
63        Self {
64            context,
65            creator: TableCreator::new(task),
66        }
67    }
68
69    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
70        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
71
72        Ok(CreateTableProcedure {
73            context,
74            creator: TableCreator {
75                data,
76                opening_regions: vec![],
77            },
78        })
79    }
80
81    fn table_info(&self) -> &RawTableInfo {
82        &self.creator.data.task.table_info
83    }
84
85    pub(crate) fn table_id(&self) -> TableId {
86        self.table_info().ident.table_id
87    }
88
89    fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
90        self.creator
91            .data
92            .region_wal_options
93            .as_ref()
94            .context(error::UnexpectedSnafu {
95                err_msg: "region_wal_options is not allocated",
96            })
97    }
98
99    fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
100        self.creator
101            .data
102            .table_route
103            .as_ref()
104            .context(error::UnexpectedSnafu {
105                err_msg: "table_route is not allocated",
106            })
107    }
108
109    #[cfg(any(test, feature = "testing"))]
110    pub fn set_allocated_metadata(
111        &mut self,
112        table_id: TableId,
113        table_route: PhysicalTableRouteValue,
114        region_wal_options: HashMap<RegionNumber, String>,
115    ) {
116        self.creator
117            .set_allocated_metadata(table_id, table_route, region_wal_options)
118    }
119
120    /// On the prepare step, it performs:
121    /// - Checks whether the table exists.
122    /// - Allocates the table id.
123    ///
124    /// Abort(non-retry):
125    /// - TableName exists and `create_if_not_exists` is false.
126    /// - Failed to allocate [TableMetadata].
127    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
128        let expr = &self.creator.data.task.create_table;
129        let table_name_value = self
130            .context
131            .table_metadata_manager
132            .table_name_manager()
133            .get(TableNameKey::new(
134                &expr.catalog_name,
135                &expr.schema_name,
136                &expr.table_name,
137            ))
138            .await?;
139
140        if let Some(value) = table_name_value {
141            ensure!(
142                expr.create_if_not_exists,
143                error::TableAlreadyExistsSnafu {
144                    table_name: self.creator.data.table_ref().to_string(),
145                }
146            );
147
148            let table_id = value.table_id();
149            return Ok(Status::done_with_output(table_id));
150        }
151
152        self.creator.data.state = CreateTableState::DatanodeCreateRegions;
153        let TableMetadata {
154            table_id,
155            table_route,
156            region_wal_options,
157        } = self
158            .context
159            .table_metadata_allocator
160            .create(&self.creator.data.task)
161            .await?;
162        self.creator
163            .set_allocated_metadata(table_id, table_route, region_wal_options);
164
165        Ok(Status::executing(true))
166    }
167
168    pub fn new_region_request_builder(
169        &self,
170        physical_table_id: Option<TableId>,
171    ) -> Result<CreateRequestBuilder> {
172        let create_table_expr = &self.creator.data.task.create_table;
173        let template = build_template(create_table_expr)?;
174        Ok(CreateRequestBuilder::new(template, physical_table_id))
175    }
176
177    /// Creates regions on datanodes
178    ///
179    /// Abort(non-retry):
180    /// - Failed to create [CreateRequestBuilder].
181    /// - Failed to get the table route of physical table (for logical table).
182    ///
183    /// Retry:
184    /// - If the underlying servers returns one of the following [Code](tonic::status::Code):
185    ///   - [Code::Cancelled](tonic::status::Code::Cancelled)
186    ///   - [Code::DeadlineExceeded](tonic::status::Code::DeadlineExceeded)
187    ///   - [Code::Unavailable](tonic::status::Code::Unavailable)
188    pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
189        let table_route = self.table_route()?.clone();
190        let request_builder = self.new_region_request_builder(None)?;
191        // Registers opening regions
192        let guards = self
193            .creator
194            .register_opening_regions(&self.context, &table_route.region_routes)?;
195        if !guards.is_empty() {
196            self.creator.opening_regions = guards;
197        }
198        self.create_regions(&table_route.region_routes, request_builder)
199            .await
200    }
201
202    async fn create_regions(
203        &mut self,
204        region_routes: &[RegionRoute],
205        request_builder: CreateRequestBuilder,
206    ) -> Result<Status> {
207        let create_table_data = &self.creator.data;
208        // Safety: the region_wal_options must be allocated
209        let region_wal_options = self.region_wal_options()?;
210        let create_table_expr = &create_table_data.task.create_table;
211        let catalog = &create_table_expr.catalog_name;
212        let schema = &create_table_expr.schema_name;
213        let storage_path = region_storage_path(catalog, schema);
214        let leaders = find_leaders(region_routes);
215        let mut create_region_tasks = Vec::with_capacity(leaders.len());
216
217        let partition_exprs = region_routes
218            .iter()
219            .map(|r| (r.region.id.region_number(), r.region.partition_expr()))
220            .collect();
221
222        for datanode in leaders {
223            let requester = self.context.node_manager.datanode(&datanode).await;
224
225            let regions = find_leader_regions(region_routes, &datanode);
226            let mut requests = Vec::with_capacity(regions.len());
227            for region_number in regions {
228                let region_id = RegionId::new(self.table_id(), region_number);
229                let create_region_request = request_builder.build_one(
230                    region_id,
231                    storage_path.clone(),
232                    region_wal_options,
233                    &partition_exprs,
234                );
235                requests.push(PbRegionRequest::Create(create_region_request));
236            }
237
238            for request in requests {
239                let request = RegionRequest {
240                    header: Some(RegionRequestHeader {
241                        tracing_context: TracingContext::from_current_span().to_w3c(),
242                        ..Default::default()
243                    }),
244                    body: Some(request),
245                };
246
247                let datanode = datanode.clone();
248                let requester = requester.clone();
249                create_region_tasks.push(async move {
250                    requester
251                        .handle(request)
252                        .await
253                        .map_err(add_peer_context_if_needed(datanode))
254                });
255            }
256        }
257
258        let mut results = join_all(create_region_tasks)
259            .await
260            .into_iter()
261            .collect::<Result<Vec<_>>>()?;
262
263        if let Some(column_metadatas) =
264            extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
265        {
266            self.creator.data.column_metadatas = column_metadatas;
267        } else {
268            warn!(
269                "creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
270            );
271        }
272
273        self.creator.data.state = CreateTableState::CreateMetadata;
274        Ok(Status::executing(true))
275    }
276
277    /// Creates table metadata
278    ///
279    /// Abort(not-retry):
280    /// - Failed to create table metadata.
281    async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result<Status> {
282        let table_id = self.table_id();
283        let table_ref = self.creator.data.table_ref();
284        let manager = &self.context.table_metadata_manager;
285
286        let mut raw_table_info = self.table_info().clone();
287        if !self.creator.data.column_metadatas.is_empty() {
288            update_table_info_column_ids(&mut raw_table_info, &self.creator.data.column_metadatas);
289        }
290        // Safety: the region_wal_options must be allocated.
291        let region_wal_options = self.region_wal_options()?.clone();
292        // Safety: the table_route must be allocated.
293        let physical_table_route = self.table_route()?.clone();
294        let detecting_regions =
295            convert_region_routes_to_detecting_regions(&physical_table_route.region_routes);
296        let table_route = TableRouteValue::Physical(physical_table_route);
297        manager
298            .create_table_metadata(raw_table_info, table_route, region_wal_options)
299            .await?;
300        self.context
301            .register_failure_detectors(detecting_regions)
302            .await;
303        info!(
304            "Successfully created table: {}, table_id: {}, procedure_id: {}",
305            table_ref, table_id, pid
306        );
307
308        self.creator.opening_regions.clear();
309        Ok(Status::done_with_output(table_id))
310    }
311}
312
313#[async_trait]
314impl Procedure for CreateTableProcedure {
315    fn type_name(&self) -> &str {
316        Self::TYPE_NAME
317    }
318
319    fn recover(&mut self) -> ProcedureResult<()> {
320        // Only registers regions if the table route is allocated.
321        if let Some(x) = &self.creator.data.table_route {
322            self.creator.opening_regions = self
323                .creator
324                .register_opening_regions(&self.context, &x.region_routes)
325                .map_err(BoxedError::new)
326                .context(ExternalSnafu {
327                    clean_poisons: false,
328                })?;
329        }
330
331        Ok(())
332    }
333
334    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
335        let state = &self.creator.data.state;
336
337        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
338            .with_label_values(&[state.as_ref()])
339            .start_timer();
340
341        match state {
342            CreateTableState::Prepare => self.on_prepare().await,
343            CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
344            CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
345        }
346        .map_err(map_to_procedure_error)
347    }
348
349    fn dump(&self) -> ProcedureResult<String> {
350        serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
351    }
352
353    fn lock_key(&self) -> LockKey {
354        let table_ref = &self.creator.data.table_ref();
355
356        LockKey::new(vec![
357            CatalogLock::Read(table_ref.catalog).into(),
358            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
359            TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
360        ])
361    }
362}
363
364pub struct TableCreator {
365    /// The serializable data.
366    pub data: CreateTableData,
367    /// The guards of opening.
368    pub opening_regions: Vec<OperatingRegionGuard>,
369}
370
371impl TableCreator {
372    pub fn new(task: CreateTableTask) -> Self {
373        Self {
374            data: CreateTableData {
375                state: CreateTableState::Prepare,
376                column_metadatas: vec![],
377                task,
378                table_route: None,
379                region_wal_options: None,
380            },
381            opening_regions: vec![],
382        }
383    }
384
385    /// Registers and returns the guards of the opening region if they don't exist.
386    fn register_opening_regions(
387        &self,
388        context: &DdlContext,
389        region_routes: &[RegionRoute],
390    ) -> Result<Vec<OperatingRegionGuard>> {
391        let opening_regions = operating_leader_regions(region_routes);
392
393        if self.opening_regions.len() == opening_regions.len() {
394            return Ok(vec![]);
395        }
396
397        let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
398
399        for (region_id, datanode_id) in opening_regions {
400            let guard = context
401                .memory_region_keeper
402                .register(datanode_id, region_id)
403                .context(error::RegionOperatingRaceSnafu {
404                    region_id,
405                    peer_id: datanode_id,
406                })?;
407            opening_region_guards.push(guard);
408        }
409        Ok(opening_region_guards)
410    }
411
412    fn set_allocated_metadata(
413        &mut self,
414        table_id: TableId,
415        table_route: PhysicalTableRouteValue,
416        region_wal_options: HashMap<RegionNumber, String>,
417    ) {
418        self.data.task.table_info.ident.table_id = table_id;
419        self.data.table_route = Some(table_route);
420        self.data.region_wal_options = Some(region_wal_options);
421    }
422}
423
424#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
425pub enum CreateTableState {
426    /// Prepares to create the table
427    Prepare,
428    /// Creates regions on the Datanode
429    DatanodeCreateRegions,
430    /// Creates metadata
431    CreateMetadata,
432}
433
434#[derive(Debug, Serialize, Deserialize)]
435pub struct CreateTableData {
436    pub state: CreateTableState,
437    pub task: CreateTableTask,
438    #[serde(default)]
439    pub column_metadatas: Vec<ColumnMetadata>,
440    /// None stands for not allocated yet.
441    table_route: Option<PhysicalTableRouteValue>,
442    /// None stands for not allocated yet.
443    pub region_wal_options: Option<HashMap<RegionNumber, String>>,
444}
445
446impl CreateTableData {
447    fn table_ref(&self) -> TableReference<'_> {
448        self.task.table_ref()
449    }
450}