common_meta/ddl/
create_logical_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use api::region::RegionResponse;
21use api::v1::CreateTableExpr;
22use async_trait::async_trait;
23use common_catalog::consts::METRIC_ENGINE;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
26use common_telemetry::{debug, error, warn};
27use futures::future;
28pub use region_request::create_region_request_builder;
29use serde::{Deserialize, Serialize};
30use snafu::ResultExt;
31use store_api::metadata::ColumnMetadata;
32use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
33use store_api::storage::{RegionId, RegionNumber};
34use strum::AsRefStr;
35use table::metadata::{RawTableInfo, TableId};
36
37use crate::ddl::utils::{
38    add_peer_context_if_needed, extract_column_metadatas, map_to_procedure_error,
39    sync_follower_regions,
40};
41use crate::ddl::DdlContext;
42use crate::error::Result;
43use crate::key::table_route::TableRouteValue;
44use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
45use crate::metrics;
46use crate::rpc::ddl::CreateTableTask;
47use crate::rpc::router::{find_leaders, RegionRoute};
48
49pub struct CreateLogicalTablesProcedure {
50    pub context: DdlContext,
51    pub data: CreateTablesData,
52}
53
54impl CreateLogicalTablesProcedure {
55    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateLogicalTables";
56
57    pub fn new(
58        tasks: Vec<CreateTableTask>,
59        physical_table_id: TableId,
60        context: DdlContext,
61    ) -> Self {
62        Self {
63            context,
64            data: CreateTablesData {
65                state: CreateTablesState::Prepare,
66                tasks,
67                table_ids_already_exists: vec![],
68                physical_table_id,
69                physical_region_numbers: vec![],
70                physical_columns: vec![],
71            },
72        }
73    }
74
75    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
76        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
77        Ok(Self { context, data })
78    }
79
80    /// On the prepares step, it performs:
81    /// - Checks whether physical table exists.
82    /// - Checks whether logical tables exist.
83    /// - Allocates the table ids.
84    /// - Modify tasks to sort logical columns on their names.
85    ///
86    /// Abort(non-retry):
87    /// - The physical table does not exist.
88    /// - Failed to check whether tables exist.
89    /// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`.
90    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
91        self.check_input_tasks()?;
92        // Sets physical region numbers
93        self.fill_physical_table_info().await?;
94        // Checks if the tables exist
95        self.check_tables_already_exist().await?;
96
97        // If all tables already exist, returns the table_ids.
98        if self
99            .data
100            .table_ids_already_exists
101            .iter()
102            .all(Option::is_some)
103        {
104            return Ok(Status::done_with_output(
105                self.data
106                    .table_ids_already_exists
107                    .drain(..)
108                    .flatten()
109                    .collect::<Vec<_>>(),
110            ));
111        }
112
113        // Allocates table ids and sort columns on their names.
114        self.allocate_table_ids().await?;
115
116        self.data.state = CreateTablesState::DatanodeCreateRegions;
117        Ok(Status::executing(true))
118    }
119
120    pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
121        let (_, physical_table_route) = self
122            .context
123            .table_metadata_manager
124            .table_route_manager()
125            .get_physical_table_route(self.data.physical_table_id)
126            .await?;
127
128        self.create_regions(&physical_table_route.region_routes)
129            .await
130    }
131
132    /// Creates table metadata for logical tables and update corresponding physical
133    /// table's metadata.
134    ///
135    /// Abort(not-retry):
136    /// - Failed to create table metadata.
137    pub async fn on_create_metadata(&mut self) -> Result<Status> {
138        self.update_physical_table_metadata().await?;
139        let table_ids = self.create_logical_tables_metadata().await?;
140
141        Ok(Status::done_with_output(table_ids))
142    }
143
144    async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
145        let leaders = find_leaders(region_routes);
146        let mut create_region_tasks = Vec::with_capacity(leaders.len());
147
148        for peer in leaders {
149            let requester = self.context.node_manager.datanode(&peer).await;
150            let Some(request) = self.make_request(&peer, region_routes)? else {
151                debug!("no region request to send to datanode {}", peer);
152                // We can skip the rest of the datanodes,
153                // the rest of the datanodes should have the same result.
154                break;
155            };
156
157            create_region_tasks.push(async move {
158                requester
159                    .handle(request)
160                    .await
161                    .map_err(add_peer_context_if_needed(peer))
162            });
163        }
164
165        let mut results = future::join_all(create_region_tasks)
166            .await
167            .into_iter()
168            .collect::<Result<Vec<_>>>()?;
169
170        if let Some(column_metadatas) =
171            extract_column_metadatas(&mut results, ALTER_PHYSICAL_EXTENSION_KEY)?
172        {
173            self.data.physical_columns = column_metadatas;
174        } else {
175            warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
176        }
177
178        self.submit_sync_region_requests(&results, region_routes)
179            .await;
180        self.data.state = CreateTablesState::CreateMetadata;
181        Ok(Status::executing(true))
182    }
183
184    async fn submit_sync_region_requests(
185        &self,
186        results: &[RegionResponse],
187        region_routes: &[RegionRoute],
188    ) {
189        if let Err(err) = sync_follower_regions(
190            &self.context,
191            self.data.physical_table_id,
192            results,
193            region_routes,
194            METRIC_ENGINE,
195        )
196        .await
197        {
198            error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id);
199        }
200    }
201}
202
203#[async_trait]
204impl Procedure for CreateLogicalTablesProcedure {
205    fn type_name(&self) -> &str {
206        Self::TYPE_NAME
207    }
208
209    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
210        let state = &self.data.state;
211
212        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES
213            .with_label_values(&[state.as_ref()])
214            .start_timer();
215
216        match state {
217            CreateTablesState::Prepare => self.on_prepare().await,
218            CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
219            CreateTablesState::CreateMetadata => self.on_create_metadata().await,
220        }
221        .map_err(map_to_procedure_error)
222    }
223
224    fn dump(&self) -> ProcedureResult<String> {
225        serde_json::to_string(&self.data).context(ToJsonSnafu)
226    }
227
228    fn lock_key(&self) -> LockKey {
229        // CatalogLock, SchemaLock,
230        // TableLock
231        // TableNameLock(s)
232        let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
233        let table_ref = self.data.tasks[0].table_ref();
234        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
235        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
236        lock_key.push(TableLock::Write(self.data.physical_table_id).into());
237
238        for task in &self.data.tasks {
239            lock_key.push(
240                TableNameLock::new(
241                    &task.create_table.catalog_name,
242                    &task.create_table.schema_name,
243                    &task.create_table.table_name,
244                )
245                .into(),
246            );
247        }
248        LockKey::new(lock_key)
249    }
250}
251
252#[derive(Debug, Serialize, Deserialize)]
253pub struct CreateTablesData {
254    state: CreateTablesState,
255    tasks: Vec<CreateTableTask>,
256    table_ids_already_exists: Vec<Option<TableId>>,
257    physical_table_id: TableId,
258    physical_region_numbers: Vec<RegionNumber>,
259    physical_columns: Vec<ColumnMetadata>,
260}
261
262impl CreateTablesData {
263    pub fn state(&self) -> &CreateTablesState {
264        &self.state
265    }
266
267    fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
268        self.tasks
269            .iter()
270            .map(|task| &task.create_table)
271            .collect::<Vec<_>>()
272    }
273
274    /// Returns the remaining tasks.
275    /// The length of tasks must be greater than 0.
276    fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
277        self.tasks
278            .iter()
279            .zip(self.table_ids_already_exists.iter())
280            .flat_map(|(task, table_id)| {
281                if table_id.is_none() {
282                    let table_info = task.table_info.clone();
283                    let region_ids = self
284                        .physical_region_numbers
285                        .iter()
286                        .map(|region_number| {
287                            RegionId::new(table_info.ident.table_id, *region_number)
288                        })
289                        .collect();
290                    let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
291                    Some((table_info, table_route))
292                } else {
293                    None
294                }
295            })
296            .collect::<Vec<_>>()
297    }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
301pub enum CreateTablesState {
302    /// Prepares to create the tables
303    Prepare,
304    /// Creates regions on the Datanode
305    DatanodeCreateRegions,
306    /// Creates metadata
307    CreateMetadata,
308}