common_meta/ddl/
create_logical_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use api::region::RegionResponse;
21use api::v1::CreateTableExpr;
22use async_trait::async_trait;
23use common_catalog::consts::METRIC_ENGINE;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
26use common_telemetry::{debug, error, warn};
27use futures::future;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
32use store_api::storage::{RegionId, RegionNumber};
33use strum::AsRefStr;
34use table::metadata::{RawTableInfo, TableId};
35
36use crate::ddl::utils::{
37    add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
38};
39use crate::ddl::DdlContext;
40use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
41use crate::key::table_route::TableRouteValue;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
43use crate::metrics;
44use crate::rpc::ddl::CreateTableTask;
45use crate::rpc::router::{find_leaders, RegionRoute};
46
47pub struct CreateLogicalTablesProcedure {
48    pub context: DdlContext,
49    pub data: CreateTablesData,
50}
51
52impl CreateLogicalTablesProcedure {
53    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateLogicalTables";
54
55    pub fn new(
56        tasks: Vec<CreateTableTask>,
57        physical_table_id: TableId,
58        context: DdlContext,
59    ) -> Self {
60        Self {
61            context,
62            data: CreateTablesData {
63                state: CreateTablesState::Prepare,
64                tasks,
65                table_ids_already_exists: vec![],
66                physical_table_id,
67                physical_region_numbers: vec![],
68                physical_columns: vec![],
69            },
70        }
71    }
72
73    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
74        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
75        Ok(Self { context, data })
76    }
77
78    /// On the prepares step, it performs:
79    /// - Checks whether physical table exists.
80    /// - Checks whether logical tables exist.
81    /// - Allocates the table ids.
82    /// - Modify tasks to sort logical columns on their names.
83    ///
84    /// Abort(non-retry):
85    /// - The physical table does not exist.
86    /// - Failed to check whether tables exist.
87    /// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`.
88    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
89        self.check_input_tasks()?;
90        // Sets physical region numbers
91        self.fill_physical_table_info().await?;
92        // Checks if the tables exist
93        self.check_tables_already_exist().await?;
94
95        // If all tables already exist, returns the table_ids.
96        if self
97            .data
98            .table_ids_already_exists
99            .iter()
100            .all(Option::is_some)
101        {
102            return Ok(Status::done_with_output(
103                self.data
104                    .table_ids_already_exists
105                    .drain(..)
106                    .flatten()
107                    .collect::<Vec<_>>(),
108            ));
109        }
110
111        // Allocates table ids and sort columns on their names.
112        self.allocate_table_ids().await?;
113
114        self.data.state = CreateTablesState::DatanodeCreateRegions;
115        Ok(Status::executing(true))
116    }
117
118    pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
119        let (_, physical_table_route) = self
120            .context
121            .table_metadata_manager
122            .table_route_manager()
123            .get_physical_table_route(self.data.physical_table_id)
124            .await?;
125
126        self.create_regions(&physical_table_route.region_routes)
127            .await
128    }
129
130    /// Creates table metadata for logical tables and update corresponding physical
131    /// table's metadata.
132    ///
133    /// Abort(not-retry):
134    /// - Failed to create table metadata.
135    pub async fn on_create_metadata(&mut self) -> Result<Status> {
136        self.update_physical_table_metadata().await?;
137        let table_ids = self.create_logical_tables_metadata().await?;
138
139        Ok(Status::done_with_output(table_ids))
140    }
141
142    async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
143        let leaders = find_leaders(region_routes);
144        let mut create_region_tasks = Vec::with_capacity(leaders.len());
145
146        for peer in leaders {
147            let requester = self.context.node_manager.datanode(&peer).await;
148            let Some(request) = self.make_request(&peer, region_routes)? else {
149                debug!("no region request to send to datanode {}", peer);
150                // We can skip the rest of the datanodes,
151                // the rest of the datanodes should have the same result.
152                break;
153            };
154
155            create_region_tasks.push(async move {
156                requester
157                    .handle(request)
158                    .await
159                    .map_err(add_peer_context_if_needed(peer))
160            });
161        }
162
163        let mut results = future::join_all(create_region_tasks)
164            .await
165            .into_iter()
166            .collect::<Result<Vec<_>>>()?;
167
168        // Collects response from datanodes.
169        let phy_raw_schemas = results
170            .iter_mut()
171            .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
172            .collect::<Vec<_>>();
173
174        if phy_raw_schemas.is_empty() {
175            self.submit_sync_region_requests(results, region_routes)
176                .await;
177            self.data.state = CreateTablesState::CreateMetadata;
178            return Ok(Status::executing(false));
179        }
180
181        // Verify all the physical schemas are the same
182        // Safety: previous check ensures this vec is not empty
183        let first = phy_raw_schemas.first().unwrap();
184        ensure!(
185            phy_raw_schemas.iter().all(|x| x == first),
186            MetadataCorruptionSnafu {
187                err_msg: "The physical schemas from datanodes are not the same."
188            }
189        );
190
191        // Decodes the physical raw schemas
192        if let Some(phy_raw_schemas) = first {
193            self.data.physical_columns =
194                ColumnMetadata::decode_list(phy_raw_schemas).context(DecodeJsonSnafu)?;
195        } else {
196            warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
197        }
198
199        self.submit_sync_region_requests(results, region_routes)
200            .await;
201        self.data.state = CreateTablesState::CreateMetadata;
202
203        Ok(Status::executing(true))
204    }
205
206    async fn submit_sync_region_requests(
207        &self,
208        results: Vec<RegionResponse>,
209        region_routes: &[RegionRoute],
210    ) {
211        if let Err(err) = sync_follower_regions(
212            &self.context,
213            self.data.physical_table_id,
214            results,
215            region_routes,
216            METRIC_ENGINE,
217        )
218        .await
219        {
220            error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id);
221        }
222    }
223}
224
225#[async_trait]
226impl Procedure for CreateLogicalTablesProcedure {
227    fn type_name(&self) -> &str {
228        Self::TYPE_NAME
229    }
230
231    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
232        let state = &self.data.state;
233
234        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES
235            .with_label_values(&[state.as_ref()])
236            .start_timer();
237
238        match state {
239            CreateTablesState::Prepare => self.on_prepare().await,
240            CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
241            CreateTablesState::CreateMetadata => self.on_create_metadata().await,
242        }
243        .map_err(map_to_procedure_error)
244    }
245
246    fn dump(&self) -> ProcedureResult<String> {
247        serde_json::to_string(&self.data).context(ToJsonSnafu)
248    }
249
250    fn lock_key(&self) -> LockKey {
251        // CatalogLock, SchemaLock,
252        // TableLock
253        // TableNameLock(s)
254        let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
255        let table_ref = self.data.tasks[0].table_ref();
256        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
257        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
258        lock_key.push(TableLock::Write(self.data.physical_table_id).into());
259
260        for task in &self.data.tasks {
261            lock_key.push(
262                TableNameLock::new(
263                    &task.create_table.catalog_name,
264                    &task.create_table.schema_name,
265                    &task.create_table.table_name,
266                )
267                .into(),
268            );
269        }
270        LockKey::new(lock_key)
271    }
272}
273
274#[derive(Debug, Serialize, Deserialize)]
275pub struct CreateTablesData {
276    state: CreateTablesState,
277    tasks: Vec<CreateTableTask>,
278    table_ids_already_exists: Vec<Option<TableId>>,
279    physical_table_id: TableId,
280    physical_region_numbers: Vec<RegionNumber>,
281    physical_columns: Vec<ColumnMetadata>,
282}
283
284impl CreateTablesData {
285    pub fn state(&self) -> &CreateTablesState {
286        &self.state
287    }
288
289    fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
290        self.tasks
291            .iter()
292            .map(|task| &task.create_table)
293            .collect::<Vec<_>>()
294    }
295
296    /// Returns the remaining tasks.
297    /// The length of tasks must be greater than 0.
298    fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
299        self.tasks
300            .iter()
301            .zip(self.table_ids_already_exists.iter())
302            .flat_map(|(task, table_id)| {
303                if table_id.is_none() {
304                    let table_info = task.table_info.clone();
305                    let region_ids = self
306                        .physical_region_numbers
307                        .iter()
308                        .map(|region_number| {
309                            RegionId::new(table_info.ident.table_id, *region_number)
310                        })
311                        .collect();
312                    let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
313                    Some((table_info, table_route))
314                } else {
315                    None
316                }
317            })
318            .collect::<Vec<_>>()
319    }
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
323pub enum CreateTablesState {
324    /// Prepares to create the tables
325    Prepare,
326    /// Creates regions on the Datanode
327    DatanodeCreateRegions,
328    /// Creates metadata
329    CreateMetadata,
330}