common_meta/ddl/
create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::region::region_request::Body as PbRegionRequest;
18use api::v1::region::{RegionRequest, RegionRequestHeader};
19use async_trait::async_trait;
20use common_error::ext::BoxedError;
21use common_procedure::error::{
22    ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
23};
24use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
25use common_telemetry::info;
26use common_telemetry::tracing_context::TracingContext;
27use futures::future::join_all;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, OptionExt, ResultExt};
30use store_api::storage::{RegionId, RegionNumber};
31use strum::AsRefStr;
32use table::metadata::{RawTableInfo, TableId};
33use table::table_reference::TableReference;
34
35use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
36use crate::ddl::utils::{
37    add_peer_context_if_needed, convert_region_routes_to_detecting_regions, map_to_procedure_error,
38    region_storage_path,
39};
40use crate::ddl::{DdlContext, TableMetadata};
41use crate::error::{self, Result};
42use crate::key::table_name::TableNameKey;
43use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
44use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
45use crate::metrics;
46use crate::region_keeper::OperatingRegionGuard;
47use crate::rpc::ddl::CreateTableTask;
48use crate::rpc::router::{
49    find_leader_regions, find_leaders, operating_leader_regions, RegionRoute,
50};
51pub struct CreateTableProcedure {
52    pub context: DdlContext,
53    pub creator: TableCreator,
54}
55
56impl CreateTableProcedure {
57    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
58
59    pub fn new(task: CreateTableTask, context: DdlContext) -> Self {
60        Self {
61            context,
62            creator: TableCreator::new(task),
63        }
64    }
65
66    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
67        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
68
69        Ok(CreateTableProcedure {
70            context,
71            creator: TableCreator {
72                data,
73                opening_regions: vec![],
74            },
75        })
76    }
77
78    fn table_info(&self) -> &RawTableInfo {
79        &self.creator.data.task.table_info
80    }
81
82    pub(crate) fn table_id(&self) -> TableId {
83        self.table_info().ident.table_id
84    }
85
86    fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
87        self.creator
88            .data
89            .region_wal_options
90            .as_ref()
91            .context(error::UnexpectedSnafu {
92                err_msg: "region_wal_options is not allocated",
93            })
94    }
95
96    fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
97        self.creator
98            .data
99            .table_route
100            .as_ref()
101            .context(error::UnexpectedSnafu {
102                err_msg: "table_route is not allocated",
103            })
104    }
105
106    #[cfg(any(test, feature = "testing"))]
107    pub fn set_allocated_metadata(
108        &mut self,
109        table_id: TableId,
110        table_route: PhysicalTableRouteValue,
111        region_wal_options: HashMap<RegionNumber, String>,
112    ) {
113        self.creator
114            .set_allocated_metadata(table_id, table_route, region_wal_options)
115    }
116
117    /// On the prepare step, it performs:
118    /// - Checks whether the table exists.
119    /// - Allocates the table id.
120    ///
121    /// Abort(non-retry):
122    /// - TableName exists and `create_if_not_exists` is false.
123    /// - Failed to allocate [TableMetadata].
124    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
125        let expr = &self.creator.data.task.create_table;
126        let table_name_value = self
127            .context
128            .table_metadata_manager
129            .table_name_manager()
130            .get(TableNameKey::new(
131                &expr.catalog_name,
132                &expr.schema_name,
133                &expr.table_name,
134            ))
135            .await?;
136
137        if let Some(value) = table_name_value {
138            ensure!(
139                expr.create_if_not_exists,
140                error::TableAlreadyExistsSnafu {
141                    table_name: self.creator.data.table_ref().to_string(),
142                }
143            );
144
145            let table_id = value.table_id();
146            return Ok(Status::done_with_output(table_id));
147        }
148
149        self.creator.data.state = CreateTableState::DatanodeCreateRegions;
150        let TableMetadata {
151            table_id,
152            table_route,
153            region_wal_options,
154        } = self
155            .context
156            .table_metadata_allocator
157            .create(&self.creator.data.task)
158            .await?;
159        self.creator
160            .set_allocated_metadata(table_id, table_route, region_wal_options);
161
162        Ok(Status::executing(true))
163    }
164
165    pub fn new_region_request_builder(
166        &self,
167        physical_table_id: Option<TableId>,
168    ) -> Result<CreateRequestBuilder> {
169        let create_table_expr = &self.creator.data.task.create_table;
170        let template = build_template(create_table_expr)?;
171        Ok(CreateRequestBuilder::new(template, physical_table_id))
172    }
173
174    /// Creates regions on datanodes
175    ///
176    /// Abort(non-retry):
177    /// - Failed to create [CreateRequestBuilder].
178    /// - Failed to get the table route of physical table (for logical table).
179    ///
180    /// Retry:
181    /// - If the underlying servers returns one of the following [Code](tonic::status::Code):
182    ///   - [Code::Cancelled](tonic::status::Code::Cancelled)
183    ///   - [Code::DeadlineExceeded](tonic::status::Code::DeadlineExceeded)
184    ///   - [Code::Unavailable](tonic::status::Code::Unavailable)
185    pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
186        let table_route = self.table_route()?.clone();
187        let request_builder = self.new_region_request_builder(None)?;
188        // Registers opening regions
189        let guards = self
190            .creator
191            .register_opening_regions(&self.context, &table_route.region_routes)?;
192        if !guards.is_empty() {
193            self.creator.opening_regions = guards;
194        }
195        self.create_regions(&table_route.region_routes, request_builder)
196            .await
197    }
198
199    async fn create_regions(
200        &mut self,
201        region_routes: &[RegionRoute],
202        request_builder: CreateRequestBuilder,
203    ) -> Result<Status> {
204        let create_table_data = &self.creator.data;
205        // Safety: the region_wal_options must be allocated
206        let region_wal_options = self.region_wal_options()?;
207        let create_table_expr = &create_table_data.task.create_table;
208        let catalog = &create_table_expr.catalog_name;
209        let schema = &create_table_expr.schema_name;
210        let storage_path = region_storage_path(catalog, schema);
211        let leaders = find_leaders(region_routes);
212        let mut create_region_tasks = Vec::with_capacity(leaders.len());
213
214        for datanode in leaders {
215            let requester = self.context.node_manager.datanode(&datanode).await;
216
217            let regions = find_leader_regions(region_routes, &datanode);
218            let mut requests = Vec::with_capacity(regions.len());
219            for region_number in regions {
220                let region_id = RegionId::new(self.table_id(), region_number);
221                let create_region_request =
222                    request_builder.build_one(region_id, storage_path.clone(), region_wal_options);
223                requests.push(PbRegionRequest::Create(create_region_request));
224            }
225
226            for request in requests {
227                let request = RegionRequest {
228                    header: Some(RegionRequestHeader {
229                        tracing_context: TracingContext::from_current_span().to_w3c(),
230                        ..Default::default()
231                    }),
232                    body: Some(request),
233                };
234
235                let datanode = datanode.clone();
236                let requester = requester.clone();
237                create_region_tasks.push(async move {
238                    requester
239                        .handle(request)
240                        .await
241                        .map_err(add_peer_context_if_needed(datanode))
242                });
243            }
244        }
245
246        join_all(create_region_tasks)
247            .await
248            .into_iter()
249            .collect::<Result<Vec<_>>>()?;
250
251        self.creator.data.state = CreateTableState::CreateMetadata;
252
253        // TODO(weny): Add more tests.
254        Ok(Status::executing(true))
255    }
256
257    /// Creates table metadata
258    ///
259    /// Abort(not-retry):
260    /// - Failed to create table metadata.
261    async fn on_create_metadata(&mut self) -> Result<Status> {
262        let table_id = self.table_id();
263        let manager = &self.context.table_metadata_manager;
264
265        let raw_table_info = self.table_info().clone();
266        // Safety: the region_wal_options must be allocated.
267        let region_wal_options = self.region_wal_options()?.clone();
268        // Safety: the table_route must be allocated.
269        let physical_table_route = self.table_route()?.clone();
270        let detecting_regions =
271            convert_region_routes_to_detecting_regions(&physical_table_route.region_routes);
272        let table_route = TableRouteValue::Physical(physical_table_route);
273        manager
274            .create_table_metadata(raw_table_info, table_route, region_wal_options)
275            .await?;
276        self.context
277            .register_failure_detectors(detecting_regions)
278            .await;
279        info!("Created table metadata for table {table_id}");
280
281        self.creator.opening_regions.clear();
282        Ok(Status::done_with_output(table_id))
283    }
284}
285
286#[async_trait]
287impl Procedure for CreateTableProcedure {
288    fn type_name(&self) -> &str {
289        Self::TYPE_NAME
290    }
291
292    fn recover(&mut self) -> ProcedureResult<()> {
293        // Only registers regions if the table route is allocated.
294        if let Some(x) = &self.creator.data.table_route {
295            self.creator.opening_regions = self
296                .creator
297                .register_opening_regions(&self.context, &x.region_routes)
298                .map_err(BoxedError::new)
299                .context(ExternalSnafu {
300                    clean_poisons: false,
301                })?;
302        }
303
304        Ok(())
305    }
306
307    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
308        let state = &self.creator.data.state;
309
310        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
311            .with_label_values(&[state.as_ref()])
312            .start_timer();
313
314        match state {
315            CreateTableState::Prepare => self.on_prepare().await,
316            CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
317            CreateTableState::CreateMetadata => self.on_create_metadata().await,
318        }
319        .map_err(map_to_procedure_error)
320    }
321
322    fn dump(&self) -> ProcedureResult<String> {
323        serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
324    }
325
326    fn lock_key(&self) -> LockKey {
327        let table_ref = &self.creator.data.table_ref();
328
329        LockKey::new(vec![
330            CatalogLock::Read(table_ref.catalog).into(),
331            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
332            TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
333        ])
334    }
335}
336
337pub struct TableCreator {
338    /// The serializable data.
339    pub data: CreateTableData,
340    /// The guards of opening.
341    pub opening_regions: Vec<OperatingRegionGuard>,
342}
343
344impl TableCreator {
345    pub fn new(task: CreateTableTask) -> Self {
346        Self {
347            data: CreateTableData {
348                state: CreateTableState::Prepare,
349                task,
350                table_route: None,
351                region_wal_options: None,
352            },
353            opening_regions: vec![],
354        }
355    }
356
357    /// Registers and returns the guards of the opening region if they don't exist.
358    fn register_opening_regions(
359        &self,
360        context: &DdlContext,
361        region_routes: &[RegionRoute],
362    ) -> Result<Vec<OperatingRegionGuard>> {
363        let opening_regions = operating_leader_regions(region_routes);
364
365        if self.opening_regions.len() == opening_regions.len() {
366            return Ok(vec![]);
367        }
368
369        let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
370
371        for (region_id, datanode_id) in opening_regions {
372            let guard = context
373                .memory_region_keeper
374                .register(datanode_id, region_id)
375                .context(error::RegionOperatingRaceSnafu {
376                    region_id,
377                    peer_id: datanode_id,
378                })?;
379            opening_region_guards.push(guard);
380        }
381        Ok(opening_region_guards)
382    }
383
384    fn set_allocated_metadata(
385        &mut self,
386        table_id: TableId,
387        table_route: PhysicalTableRouteValue,
388        region_wal_options: HashMap<RegionNumber, String>,
389    ) {
390        self.data.task.table_info.ident.table_id = table_id;
391        self.data.table_route = Some(table_route);
392        self.data.region_wal_options = Some(region_wal_options);
393    }
394}
395
396#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
397pub enum CreateTableState {
398    /// Prepares to create the table
399    Prepare,
400    /// Creates regions on the Datanode
401    DatanodeCreateRegions,
402    /// Creates metadata
403    CreateMetadata,
404}
405
406#[derive(Debug, Serialize, Deserialize)]
407pub struct CreateTableData {
408    pub state: CreateTableState,
409    pub task: CreateTableTask,
410    /// None stands for not allocated yet.
411    table_route: Option<PhysicalTableRouteValue>,
412    /// None stands for not allocated yet.
413    pub region_wal_options: Option<HashMap<RegionNumber, String>>,
414}
415
416impl CreateTableData {
417    fn table_ref(&self) -> TableReference<'_> {
418        self.task.table_ref()
419    }
420}