common_meta/ddl/
create_logical_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use api::region::RegionResponse;
21use api::v1::CreateTableExpr;
22use async_trait::async_trait;
23use common_catalog::consts::METRIC_ENGINE;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
26use common_telemetry::{debug, error, warn};
27use futures::future;
28pub use region_request::create_region_request_builder;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, ResultExt};
31use store_api::metadata::ColumnMetadata;
32use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
33use store_api::storage::{RegionId, RegionNumber};
34use strum::AsRefStr;
35use table::metadata::{RawTableInfo, TableId};
36
37use crate::ddl::utils::{
38    add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
39};
40use crate::ddl::DdlContext;
41use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
42use crate::key::table_route::TableRouteValue;
43use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
44use crate::metrics;
45use crate::rpc::ddl::CreateTableTask;
46use crate::rpc::router::{find_leaders, RegionRoute};
47
48pub struct CreateLogicalTablesProcedure {
49    pub context: DdlContext,
50    pub data: CreateTablesData,
51}
52
53impl CreateLogicalTablesProcedure {
54    pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateLogicalTables";
55
56    pub fn new(
57        tasks: Vec<CreateTableTask>,
58        physical_table_id: TableId,
59        context: DdlContext,
60    ) -> Self {
61        Self {
62            context,
63            data: CreateTablesData {
64                state: CreateTablesState::Prepare,
65                tasks,
66                table_ids_already_exists: vec![],
67                physical_table_id,
68                physical_region_numbers: vec![],
69                physical_columns: vec![],
70            },
71        }
72    }
73
74    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
75        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
76        Ok(Self { context, data })
77    }
78
79    /// On the prepares step, it performs:
80    /// - Checks whether physical table exists.
81    /// - Checks whether logical tables exist.
82    /// - Allocates the table ids.
83    /// - Modify tasks to sort logical columns on their names.
84    ///
85    /// Abort(non-retry):
86    /// - The physical table does not exist.
87    /// - Failed to check whether tables exist.
88    /// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`.
89    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
90        self.check_input_tasks()?;
91        // Sets physical region numbers
92        self.fill_physical_table_info().await?;
93        // Checks if the tables exist
94        self.check_tables_already_exist().await?;
95
96        // If all tables already exist, returns the table_ids.
97        if self
98            .data
99            .table_ids_already_exists
100            .iter()
101            .all(Option::is_some)
102        {
103            return Ok(Status::done_with_output(
104                self.data
105                    .table_ids_already_exists
106                    .drain(..)
107                    .flatten()
108                    .collect::<Vec<_>>(),
109            ));
110        }
111
112        // Allocates table ids and sort columns on their names.
113        self.allocate_table_ids().await?;
114
115        self.data.state = CreateTablesState::DatanodeCreateRegions;
116        Ok(Status::executing(true))
117    }
118
119    pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
120        let (_, physical_table_route) = self
121            .context
122            .table_metadata_manager
123            .table_route_manager()
124            .get_physical_table_route(self.data.physical_table_id)
125            .await?;
126
127        self.create_regions(&physical_table_route.region_routes)
128            .await
129    }
130
131    /// Creates table metadata for logical tables and update corresponding physical
132    /// table's metadata.
133    ///
134    /// Abort(not-retry):
135    /// - Failed to create table metadata.
136    pub async fn on_create_metadata(&mut self) -> Result<Status> {
137        self.update_physical_table_metadata().await?;
138        let table_ids = self.create_logical_tables_metadata().await?;
139
140        Ok(Status::done_with_output(table_ids))
141    }
142
143    async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
144        let leaders = find_leaders(region_routes);
145        let mut create_region_tasks = Vec::with_capacity(leaders.len());
146
147        for peer in leaders {
148            let requester = self.context.node_manager.datanode(&peer).await;
149            let Some(request) = self.make_request(&peer, region_routes)? else {
150                debug!("no region request to send to datanode {}", peer);
151                // We can skip the rest of the datanodes,
152                // the rest of the datanodes should have the same result.
153                break;
154            };
155
156            create_region_tasks.push(async move {
157                requester
158                    .handle(request)
159                    .await
160                    .map_err(add_peer_context_if_needed(peer))
161            });
162        }
163
164        let mut results = future::join_all(create_region_tasks)
165            .await
166            .into_iter()
167            .collect::<Result<Vec<_>>>()?;
168
169        // Collects response from datanodes.
170        let phy_raw_schemas = results
171            .iter_mut()
172            .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
173            .collect::<Vec<_>>();
174
175        if phy_raw_schemas.is_empty() {
176            self.submit_sync_region_requests(results, region_routes)
177                .await;
178            self.data.state = CreateTablesState::CreateMetadata;
179            return Ok(Status::executing(false));
180        }
181
182        // Verify all the physical schemas are the same
183        // Safety: previous check ensures this vec is not empty
184        let first = phy_raw_schemas.first().unwrap();
185        ensure!(
186            phy_raw_schemas.iter().all(|x| x == first),
187            MetadataCorruptionSnafu {
188                err_msg: "The physical schemas from datanodes are not the same."
189            }
190        );
191
192        // Decodes the physical raw schemas
193        if let Some(phy_raw_schemas) = first {
194            self.data.physical_columns =
195                ColumnMetadata::decode_list(phy_raw_schemas).context(DecodeJsonSnafu)?;
196        } else {
197            warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
198        }
199
200        self.submit_sync_region_requests(results, region_routes)
201            .await;
202        self.data.state = CreateTablesState::CreateMetadata;
203
204        Ok(Status::executing(true))
205    }
206
207    async fn submit_sync_region_requests(
208        &self,
209        results: Vec<RegionResponse>,
210        region_routes: &[RegionRoute],
211    ) {
212        if let Err(err) = sync_follower_regions(
213            &self.context,
214            self.data.physical_table_id,
215            results,
216            region_routes,
217            METRIC_ENGINE,
218        )
219        .await
220        {
221            error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id);
222        }
223    }
224}
225
226#[async_trait]
227impl Procedure for CreateLogicalTablesProcedure {
228    fn type_name(&self) -> &str {
229        Self::TYPE_NAME
230    }
231
232    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
233        let state = &self.data.state;
234
235        let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES
236            .with_label_values(&[state.as_ref()])
237            .start_timer();
238
239        match state {
240            CreateTablesState::Prepare => self.on_prepare().await,
241            CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
242            CreateTablesState::CreateMetadata => self.on_create_metadata().await,
243        }
244        .map_err(map_to_procedure_error)
245    }
246
247    fn dump(&self) -> ProcedureResult<String> {
248        serde_json::to_string(&self.data).context(ToJsonSnafu)
249    }
250
251    fn lock_key(&self) -> LockKey {
252        // CatalogLock, SchemaLock,
253        // TableLock
254        // TableNameLock(s)
255        let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
256        let table_ref = self.data.tasks[0].table_ref();
257        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
258        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
259        lock_key.push(TableLock::Write(self.data.physical_table_id).into());
260
261        for task in &self.data.tasks {
262            lock_key.push(
263                TableNameLock::new(
264                    &task.create_table.catalog_name,
265                    &task.create_table.schema_name,
266                    &task.create_table.table_name,
267                )
268                .into(),
269            );
270        }
271        LockKey::new(lock_key)
272    }
273}
274
275#[derive(Debug, Serialize, Deserialize)]
276pub struct CreateTablesData {
277    state: CreateTablesState,
278    tasks: Vec<CreateTableTask>,
279    table_ids_already_exists: Vec<Option<TableId>>,
280    physical_table_id: TableId,
281    physical_region_numbers: Vec<RegionNumber>,
282    physical_columns: Vec<ColumnMetadata>,
283}
284
285impl CreateTablesData {
286    pub fn state(&self) -> &CreateTablesState {
287        &self.state
288    }
289
290    fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
291        self.tasks
292            .iter()
293            .map(|task| &task.create_table)
294            .collect::<Vec<_>>()
295    }
296
297    /// Returns the remaining tasks.
298    /// The length of tasks must be greater than 0.
299    fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
300        self.tasks
301            .iter()
302            .zip(self.table_ids_already_exists.iter())
303            .flat_map(|(task, table_id)| {
304                if table_id.is_none() {
305                    let table_info = task.table_info.clone();
306                    let region_ids = self
307                        .physical_region_numbers
308                        .iter()
309                        .map(|region_number| {
310                            RegionId::new(table_info.ident.table_id, *region_number)
311                        })
312                        .collect();
313                    let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
314                    Some((table_info, table_route))
315                } else {
316                    None
317                }
318            })
319            .collect::<Vec<_>>()
320    }
321}
322
323#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
324pub enum CreateTablesState {
325    /// Prepares to create the tables
326    Prepare,
327    /// Creates regions on the Datanode
328    DatanodeCreateRegions,
329    /// Creates metadata
330    CreateMetadata,
331}