common_meta/ddl/
create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::region::region_request::Body as PbRegionRequest;
18use api::v1::region::{RegionRequest, RegionRequestHeader};
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use common_procedure::error::{
22    ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
23};
24use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
25use common_telemetry::info;
26use common_telemetry::tracing_context::TracingContext;
27use futures::future::join_all;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, OptionExt, ResultExt};
30use store_api::storage::{RegionId, RegionNumber};
31use strum::AsRefStr;
32use table::metadata::{RawTableInfo, TableId};
33use table::table_reference::TableReference;
34
35use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
36use crate::ddl::utils::{
37    add_peer_context_if_needed, convert_region_routes_to_detecting_regions, handle_retry_error,
38    region_storage_path,
39};
40use crate::ddl::{DdlContext, TableMetadata};
41use crate::error::{self, Result};
42use crate::key::table_name::TableNameKey;
43use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
44use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
45use crate::metrics;
46use crate::region_keeper::OperatingRegionGuard;
47use crate::rpc::ddl::CreateTableTask;
48use crate::rpc::router::{
49    find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
50};
51pub struct CreateTableProcedure {
52    pub context: DdlContext,
53    pub creator: TableCreator,
54}
55
56impl CreateTableProcedure {
57    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
58
59    pub fn new(task: CreateTableTask, context: DdlContext) -> Self {
60        Self {
61            context,
62            creator: TableCreator::new(task),
63        }
64    }
65
66    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
67        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
68
69        Ok(CreateTableProcedure {
70            context,
71            creator: TableCreator {
72                data,
73                opening_regions: vec![],
74            },
75        })
76    }
77
78    fn table_info(&self) -> &RawTableInfo {
79        &self.creator.data.task.table_info
80    }
81
82    pub(crate) fn table_id(&self) -> TableId {
83        self.table_info().ident.table_id
84    }
85
86    fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
87        self.creator
88            .data
89            .region_wal_options
90            .as_ref()
91            .context(error::UnexpectedSnafu {
92                err_msg: "region_wal_options is not allocated",
93            })
94    }
95
96    fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
97        self.creator
98            .data
99            .table_route
100            .as_ref()
101            .context(error::UnexpectedSnafu {
102                err_msg: "table_route is not allocated",
103            })
104    }
105
106    #[cfg(any(test, feature = "testing"))]
107    pub fn set_allocated_metadata(
108        &mut self,
109        table_id: TableId,
110        table_route: PhysicalTableRouteValue,
111        region_wal_options: HashMap<RegionNumber, String>,
112    ) {
113        self.creator
114            .set_allocated_metadata(table_id, table_route, region_wal_options)
115    }
116
117    /// On the prepare step, it performs:
118    /// - Checks whether the table exists.
119    /// - Allocates the table id.
120    ///
121    /// Abort(non-retry):
122    /// - TableName exists and `create_if_not_exists` is false.
123    /// - Failed to allocate [TableMetadata].
124    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
125        let expr = &self.creator.data.task.create_table;
126        let table_name_value = self
127            .context
128            .table_metadata_manager
129            .table_name_manager()
130            .get(TableNameKey::new(
131                &expr.catalog_name,
132                &expr.schema_name,
133                &expr.table_name,
134            ))
135            .await?;
136
137        if let Some(value) = table_name_value {
138            ensure!(
139                expr.create_if_not_exists,
140                error::TableAlreadyExistsSnafu {
141                    table_name: self.creator.data.table_ref().to_string(),
142                }
143            );
144
145            let table_id = value.table_id();
146            return Ok(Status::done_with_output(table_id));
147        }
148
149        self.creator.data.state = CreateTableState::DatanodeCreateRegions;
150        let TableMetadata {
151            table_id,
152            table_route,
153            region_wal_options,
154        } = self
155            .context
156            .table_metadata_allocator
157            .create(&self.creator.data.task)
158            .await?;
159        self.creator
160            .set_allocated_metadata(table_id, table_route, region_wal_options);
161
162        Ok(Status::executing(true))
163    }
164
165    pub fn new_region_request_builder(
166        &self,
167        physical_table_id: Option<TableId>,
168    ) -> Result<CreateRequestBuilder> {
169        let create_table_expr = &self.creator.data.task.create_table;
170        let template = build_template(create_table_expr)?;
171        Ok(CreateRequestBuilder::new(template, physical_table_id))
172    }
173
174    /// Creates regions on datanodes
175    ///
176    /// Abort(non-retry):
177    /// - Failed to create [CreateRequestBuilder].
178    /// - Failed to get the table route of physical table (for logical table).
179    ///
180    /// Retry:
181    /// - If the underlying servers returns one of the following [Code](tonic::status::Code):
182    ///   - [Code::Cancelled](tonic::status::Code::Cancelled)
183    ///   - [Code::DeadlineExceeded](tonic::status::Code::DeadlineExceeded)
184    ///   - [Code::Unavailable](tonic::status::Code::Unavailable)
185    pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
186        let table_route = self.table_route()?.clone();
187        let request_builder = self.new_region_request_builder(None)?;
188        // Registers opening regions
189        let guards = self
190            .creator
191            .register_opening_regions(&self.context, &table_route.region_routes)?;
192        if !guards.is_empty() {
193            self.creator.opening_regions = guards;
194        }
195        self.create_regions(&table_route.region_routes, request_builder)
196            .await
197    }
198
199    async fn create_regions(
200        &mut self,
201        region_routes: &[RegionRoute],
202        request_builder: CreateRequestBuilder,
203    ) -> Result<Status> {
204        let create_table_data = &self.creator.data;
205        // Safety: the region_wal_options must be allocated
206        let region_wal_options = self.region_wal_options()?;
207        let create_table_expr = &create_table_data.task.create_table;
208        let catalog = &create_table_expr.catalog_name;
209        let schema = &create_table_expr.schema_name;
210        let storage_path = region_storage_path(catalog, schema);
211        let leaders = find_leaders(region_routes);
212        let mut create_region_tasks = Vec::with_capacity(leaders.len());
213
214        for datanode in leaders {
215            let requester = self.context.node_manager.datanode(&datanode).await;
216
217            let regions = find_leader_regions(region_routes, &datanode);
218            let mut requests = Vec::with_capacity(regions.len());
219            for region_number in regions {
220                let region_id = RegionId::new(self.table_id(), region_number);
221                let create_region_request = request_builder.build_one(
222                    region_id,
223                    storage_path.clone(),
224                    region_wal_options,
225                )?;
226                requests.push(PbRegionRequest::Create(create_region_request));
227            }
228
229            for request in requests {
230                let request = RegionRequest {
231                    header: Some(RegionRequestHeader {
232                        tracing_context: TracingContext::from_current_span().to_w3c(),
233                        ..Default::default()
234                    }),
235                    body: Some(request),
236                };
237
238                let datanode = datanode.clone();
239                let requester = requester.clone();
240                create_region_tasks.push(async move {
241                    requester
242                        .handle(request)
243                        .await
244                        .map_err(add_peer_context_if_needed(datanode))
245                });
246            }
247        }
248
249        join_all(create_region_tasks)
250            .await
251            .into_iter()
252            .collect::<Result<Vec<_>>>()?;
253
254        self.creator.data.state = CreateTableState::CreateMetadata;
255
256        // TODO(weny): Add more tests.
257        Ok(Status::executing(true))
258    }
259
260    /// Creates table metadata
261    ///
262    /// Abort(not-retry):
263    /// - Failed to create table metadata.
264    async fn on_create_metadata(&mut self) -> Result<Status> {
265        let table_id = self.table_id();
266        let manager = &self.context.table_metadata_manager;
267
268        let raw_table_info = self.table_info().clone();
269        // Safety: the region_wal_options must be allocated.
270        let region_wal_options = self.region_wal_options()?.clone();
271        // Safety: the table_route must be allocated.
272        let physical_table_route = self.table_route()?.clone();
273        let detecting_regions =
274            convert_region_routes_to_detecting_regions(&physical_table_route.region_routes);
275        let table_route = TableRouteValue::Physical(physical_table_route);
276        manager
277            .create_table_metadata(raw_table_info, table_route, region_wal_options)
278            .await?;
279        self.context
280            .register_failure_detectors(detecting_regions)
281            .await;
282        info!("Created table metadata for table {table_id}");
283
284        self.creator.opening_regions.clear();
285        Ok(Status::done_with_output(table_id))
286    }
287}
288
289#[async_trait]
290impl Procedure for CreateTableProcedure {
291    fn type_name(&self) -> &str {
292        Self::TYPE_NAME
293    }
294
295    fn recover(&mut self) -> ProcedureResult<()> {
296        // Only registers regions if the table route is allocated.
297        if let Some(x) = &self.creator.data.table_route {
298            self.creator.opening_regions = self
299                .creator
300                .register_opening_regions(&self.context, &x.region_routes)
301                .map_err(BoxedError::new)
302                .context(ExternalSnafu {
303                    clean_poisons: false,
304                })?;
305        }
306
307        Ok(())
308    }
309
310    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
311        let state = &self.creator.data.state;
312
313        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
314            .with_label_values(&[state.as_ref()])
315            .start_timer();
316
317        match state {
318            CreateTableState::Prepare => self.on_prepare().await,
319            CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
320            CreateTableState::CreateMetadata => self.on_create_metadata().await,
321        }
322        .map_err(handle_retry_error)
323    }
324
325    fn dump(&self) -> ProcedureResult<String> {
326        serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
327    }
328
329    fn lock_key(&self) -> LockKey {
330        let table_ref = &self.creator.data.table_ref();
331
332        LockKey::new(vec![
333            CatalogLock::Read(table_ref.catalog).into(),
334            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
335            TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
336        ])
337    }
338}
339
340pub struct TableCreator {
341    /// The serializable data.
342    pub data: CreateTableData,
343    /// The guards of opening.
344    pub opening_regions: Vec<OperatingRegionGuard>,
345}
346
347impl TableCreator {
348    pub fn new(task: CreateTableTask) -> Self {
349        Self {
350            data: CreateTableData {
351                state: CreateTableState::Prepare,
352                task,
353                table_route: None,
354                region_wal_options: None,
355            },
356            opening_regions: vec![],
357        }
358    }
359
360    /// Registers and returns the guards of the opening region if they don't exist.
361    fn register_opening_regions(
362        &self,
363        context: &DdlContext,
364        region_routes: &[RegionRoute],
365    ) -> Result<Vec<OperatingRegionGuard>> {
366        let opening_regions = operating_leader_regions(region_routes);
367
368        if self.opening_regions.len() == opening_regions.len() {
369            return Ok(vec![]);
370        }
371
372        let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
373
374        for (region_id, datanode_id) in opening_regions {
375            let guard = context
376                .memory_region_keeper
377                .register(datanode_id, region_id)
378                .context(error::RegionOperatingRaceSnafu {
379                    region_id,
380                    peer_id: datanode_id,
381                })?;
382            opening_region_guards.push(guard);
383        }
384        Ok(opening_region_guards)
385    }
386
387    fn set_allocated_metadata(
388        &mut self,
389        table_id: TableId,
390        table_route: PhysicalTableRouteValue,
391        region_wal_options: HashMap<RegionNumber, String>,
392    ) {
393        self.data.task.table_info.ident.table_id = table_id;
394        self.data.table_route = Some(table_route);
395        self.data.region_wal_options = Some(region_wal_options);
396    }
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
400pub enum CreateTableState {
401    /// Prepares to create the table
402    Prepare,
403    /// Creates regions on the Datanode
404    DatanodeCreateRegions,
405    /// Creates metadata
406    CreateMetadata,
407}
408
409#[derive(Debug, Serialize, Deserialize)]
410pub struct CreateTableData {
411    pub state: CreateTableState,
412    pub task: CreateTableTask,
413    /// None stands for not allocated yet.
414    table_route: Option<PhysicalTableRouteValue>,
415    /// None stands for not allocated yet.
416    pub region_wal_options: Option<HashMap<RegionNumber, String>>,
417}
418
419impl CreateTableData {
420    fn table_ref(&self) -> TableReference<'_> {
421        self.task.table_ref()
422    }
423}