common_meta/ddl/
create_logical_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use api::region::RegionResponse;
21use api::v1::CreateTableExpr;
22use async_trait::async_trait;
23use common_catalog::consts::METRIC_ENGINE;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
26use common_telemetry::{debug, error, warn};
27use futures::future;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
32use store_api::storage::{RegionId, RegionNumber};
33use strum::AsRefStr;
34use table::metadata::{RawTableInfo, TableId};
35
36use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, sync_follower_regions};
37use crate::ddl::DdlContext;
38use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
39use crate::key::table_route::TableRouteValue;
40use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
41use crate::metrics;
42use crate::rpc::ddl::CreateTableTask;
43use crate::rpc::router::{find_leaders, RegionRoute};
44
45pub struct CreateLogicalTablesProcedure {
46    pub context: DdlContext,
47    pub data: CreateTablesData,
48}
49
50impl CreateLogicalTablesProcedure {
51    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateLogicalTables";
52
53    pub fn new(
54        tasks: Vec<CreateTableTask>,
55        physical_table_id: TableId,
56        context: DdlContext,
57    ) -> Self {
58        Self {
59            context,
60            data: CreateTablesData {
61                state: CreateTablesState::Prepare,
62                tasks,
63                table_ids_already_exists: vec![],
64                physical_table_id,
65                physical_region_numbers: vec![],
66                physical_columns: vec![],
67            },
68        }
69    }
70
71    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
72        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
73        Ok(Self { context, data })
74    }
75
76    /// On the prepares step, it performs:
77    /// - Checks whether physical table exists.
78    /// - Checks whether logical tables exist.
79    /// - Allocates the table ids.
80    /// - Modify tasks to sort logical columns on their names.
81    ///
82    /// Abort(non-retry):
83    /// - The physical table does not exist.
84    /// - Failed to check whether tables exist.
85    /// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`.
86    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
87        self.check_input_tasks()?;
88        // Sets physical region numbers
89        self.fill_physical_table_info().await?;
90        // Checks if the tables exist
91        self.check_tables_already_exist().await?;
92
93        // If all tables already exist, returns the table_ids.
94        if self
95            .data
96            .table_ids_already_exists
97            .iter()
98            .all(Option::is_some)
99        {
100            return Ok(Status::done_with_output(
101                self.data
102                    .table_ids_already_exists
103                    .drain(..)
104                    .flatten()
105                    .collect::<Vec<_>>(),
106            ));
107        }
108
109        // Allocates table ids and sort columns on their names.
110        self.allocate_table_ids().await?;
111
112        self.data.state = CreateTablesState::DatanodeCreateRegions;
113        Ok(Status::executing(true))
114    }
115
116    pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
117        let (_, physical_table_route) = self
118            .context
119            .table_metadata_manager
120            .table_route_manager()
121            .get_physical_table_route(self.data.physical_table_id)
122            .await?;
123
124        self.create_regions(&physical_table_route.region_routes)
125            .await
126    }
127
128    /// Creates table metadata for logical tables and update corresponding physical
129    /// table's metadata.
130    ///
131    /// Abort(not-retry):
132    /// - Failed to create table metadata.
133    pub async fn on_create_metadata(&mut self) -> Result<Status> {
134        self.update_physical_table_metadata().await?;
135        let table_ids = self.create_logical_tables_metadata().await?;
136
137        Ok(Status::done_with_output(table_ids))
138    }
139
140    async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
141        let leaders = find_leaders(region_routes);
142        let mut create_region_tasks = Vec::with_capacity(leaders.len());
143
144        for peer in leaders {
145            let requester = self.context.node_manager.datanode(&peer).await;
146            let Some(request) = self.make_request(&peer, region_routes)? else {
147                debug!("no region request to send to datanode {}", peer);
148                // We can skip the rest of the datanodes,
149                // the rest of the datanodes should have the same result.
150                break;
151            };
152
153            create_region_tasks.push(async move {
154                requester
155                    .handle(request)
156                    .await
157                    .map_err(add_peer_context_if_needed(peer))
158            });
159        }
160
161        let mut results = future::join_all(create_region_tasks)
162            .await
163            .into_iter()
164            .collect::<Result<Vec<_>>>()?;
165
166        // Collects response from datanodes.
167        let phy_raw_schemas = results
168            .iter_mut()
169            .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
170            .collect::<Vec<_>>();
171
172        if phy_raw_schemas.is_empty() {
173            self.submit_sync_region_requests(results, region_routes)
174                .await;
175            self.data.state = CreateTablesState::CreateMetadata;
176            return Ok(Status::executing(false));
177        }
178
179        // Verify all the physical schemas are the same
180        // Safety: previous check ensures this vec is not empty
181        let first = phy_raw_schemas.first().unwrap();
182        ensure!(
183            phy_raw_schemas.iter().all(|x| x == first),
184            MetadataCorruptionSnafu {
185                err_msg: "The physical schemas from datanodes are not the same."
186            }
187        );
188
189        // Decodes the physical raw schemas
190        if let Some(phy_raw_schemas) = first {
191            self.data.physical_columns =
192                ColumnMetadata::decode_list(phy_raw_schemas).context(DecodeJsonSnafu)?;
193        } else {
194            warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
195        }
196
197        self.submit_sync_region_requests(results, region_routes)
198            .await;
199        self.data.state = CreateTablesState::CreateMetadata;
200
201        Ok(Status::executing(true))
202    }
203
204    async fn submit_sync_region_requests(
205        &self,
206        results: Vec<RegionResponse>,
207        region_routes: &[RegionRoute],
208    ) {
209        if let Err(err) = sync_follower_regions(
210            &self.context,
211            self.data.physical_table_id,
212            results,
213            region_routes,
214            METRIC_ENGINE,
215        )
216        .await
217        {
218            error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id);
219        }
220    }
221}
222
223#[async_trait]
224impl Procedure for CreateLogicalTablesProcedure {
225    fn type_name(&self) -> &str {
226        Self::TYPE_NAME
227    }
228
229    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
230        let state = &self.data.state;
231
232        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES
233            .with_label_values(&[state.as_ref()])
234            .start_timer();
235
236        match state {
237            CreateTablesState::Prepare => self.on_prepare().await,
238            CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
239            CreateTablesState::CreateMetadata => self.on_create_metadata().await,
240        }
241        .map_err(handle_retry_error)
242    }
243
244    fn dump(&self) -> ProcedureResult<String> {
245        serde_json::to_string(&self.data).context(ToJsonSnafu)
246    }
247
248    fn lock_key(&self) -> LockKey {
249        // CatalogLock, SchemaLock,
250        // TableLock
251        // TableNameLock(s)
252        let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
253        let table_ref = self.data.tasks[0].table_ref();
254        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
255        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
256        lock_key.push(TableLock::Write(self.data.physical_table_id).into());
257
258        for task in &self.data.tasks {
259            lock_key.push(
260                TableNameLock::new(
261                    &task.create_table.catalog_name,
262                    &task.create_table.schema_name,
263                    &task.create_table.table_name,
264                )
265                .into(),
266            );
267        }
268        LockKey::new(lock_key)
269    }
270}
271
272#[derive(Debug, Serialize, Deserialize)]
273pub struct CreateTablesData {
274    state: CreateTablesState,
275    tasks: Vec<CreateTableTask>,
276    table_ids_already_exists: Vec<Option<TableId>>,
277    physical_table_id: TableId,
278    physical_region_numbers: Vec<RegionNumber>,
279    physical_columns: Vec<ColumnMetadata>,
280}
281
282impl CreateTablesData {
283    pub fn state(&self) -> &CreateTablesState {
284        &self.state
285    }
286
287    fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
288        self.tasks
289            .iter()
290            .map(|task| &task.create_table)
291            .collect::<Vec<_>>()
292    }
293
294    /// Returns the remaining tasks.
295    /// The length of tasks must be greater than 0.
296    fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
297        self.tasks
298            .iter()
299            .zip(self.table_ids_already_exists.iter())
300            .flat_map(|(task, table_id)| {
301                if table_id.is_none() {
302                    let table_info = task.table_info.clone();
303                    let region_ids = self
304                        .physical_region_numbers
305                        .iter()
306                        .map(|region_number| {
307                            RegionId::new(table_info.ident.table_id, *region_number)
308                        })
309                        .collect();
310                    let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
311                    Some((table_info, table_route))
312                } else {
313                    None
314                }
315            })
316            .collect::<Vec<_>>()
317    }
318}
319
320#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
321pub enum CreateTablesState {
322    /// Prepares to create the tables
323    Prepare,
324    /// Creates regions on the Datanode
325    DatanodeCreateRegions,
326    /// Creates metadata
327    CreateMetadata,
328}