common_meta/ddl/
create_logical_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use api::region::RegionResponse;
21use api::v1::CreateTableExpr;
22use async_trait::async_trait;
23use common_catalog::consts::METRIC_ENGINE;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
26use common_telemetry::{debug, error, warn};
27use futures::future;
28pub use region_request::create_region_request_builder;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use store_api::metadata::ColumnMetadata;
32use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
33use store_api::storage::{RegionId, RegionNumber};
34use strum::AsRefStr;
35use table::metadata::{RawTableInfo, TableId};
36
37use crate::ddl::utils::{
38    add_peer_context_if_needed, extract_column_metadatas, map_to_procedure_error,
39    sync_follower_regions,
40};
41use crate::ddl::DdlContext;
42use crate::error::Result;
43use crate::key::table_route::TableRouteValue;
44use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
45use crate::metrics;
46use crate::rpc::ddl::CreateTableTask;
47use crate::rpc::router::{find_leaders, RegionRoute};
48
49pub struct CreateLogicalTablesProcedure {
50    pub context: DdlContext,
51    pub data: CreateTablesData,
52}
53
54impl CreateLogicalTablesProcedure {
55    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateLogicalTables";
56
57    pub fn new(
58        tasks: Vec<CreateTableTask>,
59        physical_table_id: TableId,
60        context: DdlContext,
61    ) -> Self {
62        Self {
63            context,
64            data: CreateTablesData {
65                state: CreateTablesState::Prepare,
66                tasks,
67                table_ids_already_exists: vec![],
68                physical_table_id,
69                physical_region_numbers: vec![],
70                physical_columns: vec![],
71                physical_partition_columns: vec![],
72            },
73        }
74    }
75
76    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
77        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
78        Ok(Self { context, data })
79    }
80
81    /// On the prepares step, it performs:
82    /// - Checks whether physical table exists.
83    /// - Checks whether logical tables exist.
84    /// - Allocates the table ids.
85    /// - Modify tasks to sort logical columns on their names.
86    ///
87    /// Abort(non-retry):
88    /// - The physical table does not exist.
89    /// - Failed to check whether tables exist.
90    /// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`.
91    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
92        self.check_input_tasks()?;
93        // Sets physical region numbers
94        self.fill_physical_table_info().await?;
95        // Add partition columns from physical table to logical table schemas
96        self.merge_partition_columns_into_logical_tables()?;
97        // Checks if the tables exist
98        self.check_tables_already_exist().await?;
99
100        // If all tables already exist, returns the table_ids.
101        if self
102            .data
103            .table_ids_already_exists
104            .iter()
105            .all(Option::is_some)
106        {
107            return Ok(Status::done_with_output(
108                self.data
109                    .table_ids_already_exists
110                    .drain(..)
111                    .flatten()
112                    .collect::<Vec<_>>(),
113            ));
114        }
115
116        // Allocates table ids and sort columns on their names.
117        self.allocate_table_ids().await?;
118
119        self.data.state = CreateTablesState::DatanodeCreateRegions;
120        Ok(Status::executing(true))
121    }
122
123    pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
124        let (_, physical_table_route) = self
125            .context
126            .table_metadata_manager
127            .table_route_manager()
128            .get_physical_table_route(self.data.physical_table_id)
129            .await?;
130
131        self.create_regions(&physical_table_route.region_routes)
132            .await
133    }
134
135    /// Creates table metadata for logical tables and update corresponding physical
136    /// table's metadata.
137    ///
138    /// Abort(not-retry):
139    /// - Failed to create table metadata.
140    pub async fn on_create_metadata(&mut self) -> Result<Status> {
141        self.update_physical_table_metadata().await?;
142        let table_ids = self.create_logical_tables_metadata().await?;
143
144        Ok(Status::done_with_output(table_ids))
145    }
146
147    async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
148        let leaders = find_leaders(region_routes);
149        let mut create_region_tasks = Vec::with_capacity(leaders.len());
150
151        for peer in leaders {
152            let requester = self.context.node_manager.datanode(&peer).await;
153            let Some(request) = self.make_request(&peer, region_routes)? else {
154                debug!("no region request to send to datanode {}", peer);
155                // We can skip the rest of the datanodes,
156                // the rest of the datanodes should have the same result.
157                break;
158            };
159
160            create_region_tasks.push(async move {
161                requester
162                    .handle(request)
163                    .await
164                    .map_err(add_peer_context_if_needed(peer))
165            });
166        }
167
168        let mut results = future::join_all(create_region_tasks)
169            .await
170            .into_iter()
171            .collect::<Result<Vec<_>>>()?;
172
173        if let Some(column_metadatas) =
174            extract_column_metadatas(&mut results, ALTER_PHYSICAL_EXTENSION_KEY)?
175        {
176            self.data.physical_columns = column_metadatas;
177        } else {
178            warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
179        }
180
181        self.submit_sync_region_requests(&results, region_routes)
182            .await;
183        self.data.state = CreateTablesState::CreateMetadata;
184        Ok(Status::executing(true))
185    }
186
187    async fn submit_sync_region_requests(
188        &self,
189        results: &[RegionResponse],
190        region_routes: &[RegionRoute],
191    ) {
192        if let Err(err) = sync_follower_regions(
193            &self.context,
194            self.data.physical_table_id,
195            results,
196            region_routes,
197            METRIC_ENGINE,
198        )
199        .await
200        {
201            error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id);
202        }
203    }
204}
205
206#[async_trait]
207impl Procedure for CreateLogicalTablesProcedure {
208    fn type_name(&self) -> &str {
209        Self::TYPE_NAME
210    }
211
212    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
213        let state = &self.data.state;
214
215        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES
216            .with_label_values(&[state.as_ref()])
217            .start_timer();
218
219        match state {
220            CreateTablesState::Prepare => self.on_prepare().await,
221            CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
222            CreateTablesState::CreateMetadata => self.on_create_metadata().await,
223        }
224        .map_err(map_to_procedure_error)
225    }
226
227    fn dump(&self) -> ProcedureResult<String> {
228        serde_json::to_string(&self.data).context(ToJsonSnafu)
229    }
230
231    fn lock_key(&self) -> LockKey {
232        // CatalogLock, SchemaLock,
233        // TableLock
234        // TableNameLock(s)
235        let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
236        let table_ref = self.data.tasks[0].table_ref();
237        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
238        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
239        lock_key.push(TableLock::Write(self.data.physical_table_id).into());
240
241        for task in &self.data.tasks {
242            lock_key.push(
243                TableNameLock::new(
244                    &task.create_table.catalog_name,
245                    &task.create_table.schema_name,
246                    &task.create_table.table_name,
247                )
248                .into(),
249            );
250        }
251        LockKey::new(lock_key)
252    }
253}
254
255#[derive(Debug, Serialize, Deserialize)]
256pub struct CreateTablesData {
257    state: CreateTablesState,
258    tasks: Vec<CreateTableTask>,
259    table_ids_already_exists: Vec<Option<TableId>>,
260    physical_table_id: TableId,
261    physical_region_numbers: Vec<RegionNumber>,
262    physical_columns: Vec<ColumnMetadata>,
263    physical_partition_columns: Vec<String>,
264}
265
266impl CreateTablesData {
267    pub fn state(&self) -> &CreateTablesState {
268        &self.state
269    }
270
271    fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
272        self.tasks
273            .iter()
274            .map(|task| &task.create_table)
275            .collect::<Vec<_>>()
276    }
277
278    /// Returns the remaining tasks.
279    /// The length of tasks must be greater than 0.
280    fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
281        self.tasks
282            .iter()
283            .zip(self.table_ids_already_exists.iter())
284            .flat_map(|(task, table_id)| {
285                if table_id.is_none() {
286                    let table_info = task.table_info.clone();
287                    let region_ids = self
288                        .physical_region_numbers
289                        .iter()
290                        .map(|region_number| {
291                            RegionId::new(table_info.ident.table_id, *region_number)
292                        })
293                        .collect();
294                    let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
295                    Some((table_info, table_route))
296                } else {
297                    None
298                }
299            })
300            .collect::<Vec<_>>()
301    }
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
305pub enum CreateTablesState {
306    /// Prepares to create the tables
307    Prepare,
308    /// Creates regions on the Datanode
309    DatanodeCreateRegions,
310    /// Creates metadata
311    CreateMetadata,
312}