common_meta/ddl/create_table/
executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::region::region_request::Body as PbRegionRequest;
18use api::v1::region::{RegionRequest, RegionRequestHeader};
19use common_telemetry::tracing_context::TracingContext;
20use common_telemetry::warn;
21use futures::future::join_all;
22use snafu::ensure;
23use store_api::metadata::ColumnMetadata;
24use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
25use store_api::storage::{RegionId, RegionNumber};
26use table::metadata::{RawTableInfo, TableId};
27use table::table_name::TableName;
28
29use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
30use crate::ddl::utils::{
31    add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
32    extract_column_metadatas, region_storage_path,
33};
34use crate::ddl::{CreateRequestBuilder, RegionFailureDetectorControllerRef};
35use crate::error::{self, Result};
36use crate::key::TableMetadataManagerRef;
37use crate::key::table_name::TableNameKey;
38use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
39use crate::node_manager::NodeManagerRef;
40use crate::rpc::router::{RegionRoute, find_leader_regions, find_leaders};
41
42/// [CreateTableExecutor] performs:
43/// - Creates the metadata of the table.
44/// - Creates the regions on the Datanode nodes.
45pub struct CreateTableExecutor {
46    create_if_not_exists: bool,
47    table_name: TableName,
48    builder: CreateRequestBuilder,
49}
50
51impl CreateTableExecutor {
52    /// Creates a new [`CreateTableExecutor`].
53    pub fn new(
54        table_name: TableName,
55        create_if_not_exists: bool,
56        builder: CreateRequestBuilder,
57    ) -> Self {
58        Self {
59            create_if_not_exists,
60            table_name,
61            builder,
62        }
63    }
64
65    /// On the prepare step, it performs:
66    /// - Checks whether the table exists.
67    /// - Returns the table id if the table exists.
68    ///
69    /// Abort(non-retry):
70    /// - Table exists and `create_if_not_exists` is `false`.
71    /// - Failed to get the table name value.
72    pub async fn on_prepare(
73        &self,
74        table_metadata_manager: &TableMetadataManagerRef,
75    ) -> Result<Option<TableId>> {
76        let table_name_value = table_metadata_manager
77            .table_name_manager()
78            .get(TableNameKey::new(
79                &self.table_name.catalog_name,
80                &self.table_name.schema_name,
81                &self.table_name.table_name,
82            ))
83            .await?;
84
85        if let Some(value) = table_name_value {
86            ensure!(
87                self.create_if_not_exists,
88                error::TableAlreadyExistsSnafu {
89                    table_name: self.table_name.to_string(),
90                }
91            );
92
93            return Ok(Some(value.table_id()));
94        }
95
96        Ok(None)
97    }
98
99    pub async fn on_create_regions(
100        &self,
101        node_manager: &NodeManagerRef,
102        table_id: TableId,
103        region_routes: &[RegionRoute],
104        region_wal_options: &HashMap<RegionNumber, String>,
105    ) -> Result<Vec<ColumnMetadata>> {
106        let storage_path =
107            region_storage_path(&self.table_name.catalog_name, &self.table_name.schema_name);
108        let leaders = find_leaders(region_routes);
109        let mut create_region_tasks = Vec::with_capacity(leaders.len());
110        let partition_exprs = region_routes
111            .iter()
112            .map(|r| (r.region.id.region_number(), r.region.partition_expr()))
113            .collect::<HashMap<_, _>>();
114
115        for datanode in leaders {
116            let requester = node_manager.datanode(&datanode).await;
117
118            let regions = find_leader_regions(region_routes, &datanode);
119            let mut requests = Vec::with_capacity(regions.len());
120            for region_number in regions {
121                let region_id = RegionId::new(table_id, region_number);
122                let create_region_request = self.builder.build_one(
123                    region_id,
124                    storage_path.clone(),
125                    region_wal_options,
126                    &partition_exprs,
127                );
128                requests.push(PbRegionRequest::Create(create_region_request));
129            }
130
131            for request in requests {
132                let request = RegionRequest {
133                    header: Some(RegionRequestHeader {
134                        tracing_context: TracingContext::from_current_span().to_w3c(),
135                        ..Default::default()
136                    }),
137                    body: Some(request),
138                };
139
140                let datanode = datanode.clone();
141                let requester = requester.clone();
142                create_region_tasks.push(async move {
143                    requester
144                        .handle(request)
145                        .await
146                        .map_err(add_peer_context_if_needed(datanode))
147                });
148            }
149        }
150
151        let mut results = join_all(create_region_tasks)
152            .await
153            .into_iter()
154            .collect::<Result<Vec<_>>>()?;
155
156        let column_metadatas = if let Some(column_metadatas) =
157            extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
158        {
159            column_metadatas
160        } else {
161            warn!(
162                "creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
163            );
164            vec![]
165        };
166
167        Ok(column_metadatas)
168    }
169
170    /// Creates table metadata
171    ///
172    /// Abort(non-retry):
173    /// - Failed to create table metadata.
174    pub async fn on_create_metadata(
175        &self,
176        table_metadata_manager: &TableMetadataManagerRef,
177        region_failure_detector_controller: &RegionFailureDetectorControllerRef,
178        mut raw_table_info: RawTableInfo,
179        column_metadatas: &[ColumnMetadata],
180        table_route: PhysicalTableRouteValue,
181        region_wal_options: HashMap<RegionNumber, String>,
182    ) -> Result<()> {
183        if !column_metadatas.is_empty() {
184            update_table_info_column_ids(&mut raw_table_info, column_metadatas);
185        }
186        let detecting_regions =
187            convert_region_routes_to_detecting_regions(&table_route.region_routes);
188        let table_route = TableRouteValue::Physical(table_route);
189        table_metadata_manager
190            .create_table_metadata(raw_table_info, table_route, region_wal_options)
191            .await?;
192        region_failure_detector_controller
193            .register_failure_detectors(detecting_regions)
194            .await;
195
196        Ok(())
197    }
198
199    /// Returns the builder of the executor.
200    pub fn builder(&self) -> &CreateRequestBuilder {
201        &self.builder
202    }
203}