Skip to main content

common_meta/ddl/create_table/
executor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::region::region_request::Body as PbRegionRequest;
18use api::v1::region::{RegionRequest, RegionRequestHeader};
19use common_telemetry::tracing_context::TracingContext;
20use common_telemetry::warn;
21use futures::future::join_all;
22use snafu::ensure;
23use store_api::metadata::ColumnMetadata;
24use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
25use store_api::storage::RegionId;
26use table::metadata::{TableId, TableInfo};
27use table::table_name::TableName;
28
29use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
30use crate::ddl::utils::{
31    add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
32    extract_column_metadatas, region_storage_path,
33};
34use crate::ddl::{CreateRequestBuilder, RegionFailureDetectorControllerRef};
35use crate::error::{self, Result};
36use crate::key::TableMetadataManagerRef;
37use crate::key::table_name::TableNameKey;
38use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
39use crate::node_manager::NodeManagerRef;
40use crate::rpc::router::{RegionRoute, find_leader_regions, find_leaders};
41use crate::wal_provider::RegionWalOptions;
42
43/// [CreateTableExecutor] performs:
44/// - Creates the metadata of the table.
45/// - Creates the regions on the Datanode nodes.
46pub struct CreateTableExecutor {
47    create_if_not_exists: bool,
48    table_name: TableName,
49    builder: CreateRequestBuilder,
50}
51
52impl CreateTableExecutor {
53    /// Creates a new [`CreateTableExecutor`].
54    pub fn new(
55        table_name: TableName,
56        create_if_not_exists: bool,
57        builder: CreateRequestBuilder,
58    ) -> Self {
59        Self {
60            create_if_not_exists,
61            table_name,
62            builder,
63        }
64    }
65
66    /// On the prepare step, it performs:
67    /// - Checks whether the table exists.
68    /// - Returns the table id if the table exists.
69    ///
70    /// Abort(non-retry):
71    /// - Table exists and `create_if_not_exists` is `false`.
72    /// - Failed to get the table name value.
73    pub async fn on_prepare(
74        &self,
75        table_metadata_manager: &TableMetadataManagerRef,
76    ) -> Result<Option<TableId>> {
77        let table_name_value = table_metadata_manager
78            .table_name_manager()
79            .get(TableNameKey::new(
80                &self.table_name.catalog_name,
81                &self.table_name.schema_name,
82                &self.table_name.table_name,
83            ))
84            .await?;
85
86        if let Some(value) = table_name_value {
87            ensure!(
88                self.create_if_not_exists,
89                error::TableAlreadyExistsSnafu {
90                    table_name: self.table_name.to_string(),
91                }
92            );
93
94            return Ok(Some(value.table_id()));
95        }
96
97        Ok(None)
98    }
99
100    pub async fn on_create_regions(
101        &self,
102        node_manager: &NodeManagerRef,
103        table_id: TableId,
104        region_routes: &[RegionRoute],
105        region_wal_options: &RegionWalOptions,
106    ) -> Result<Vec<ColumnMetadata>> {
107        let storage_path =
108            region_storage_path(&self.table_name.catalog_name, &self.table_name.schema_name);
109        let leaders = find_leaders(region_routes);
110        let mut create_region_tasks = Vec::with_capacity(leaders.len());
111        let partition_exprs = region_routes
112            .iter()
113            .map(|r| (r.region.id.region_number(), r.region.partition_expr()))
114            .collect::<HashMap<_, _>>();
115
116        for datanode in leaders {
117            let requester = node_manager.datanode(&datanode).await;
118
119            let regions = find_leader_regions(region_routes, &datanode);
120            let mut requests = Vec::with_capacity(regions.len());
121            for region_number in regions {
122                let region_id = RegionId::new(table_id, region_number);
123                let create_region_request = self.builder.build_one(
124                    region_id,
125                    storage_path.clone(),
126                    region_wal_options,
127                    &partition_exprs,
128                )?;
129                requests.push(PbRegionRequest::Create(create_region_request));
130            }
131
132            for request in requests {
133                let request = RegionRequest {
134                    header: Some(RegionRequestHeader {
135                        tracing_context: TracingContext::from_current_span().to_w3c(),
136                        ..Default::default()
137                    }),
138                    body: Some(request),
139                };
140
141                let datanode = datanode.clone();
142                let requester = requester.clone();
143                create_region_tasks.push(async move {
144                    requester
145                        .handle(request)
146                        .await
147                        .map_err(add_peer_context_if_needed(datanode))
148                });
149            }
150        }
151
152        let mut results = join_all(create_region_tasks)
153            .await
154            .into_iter()
155            .collect::<Result<Vec<_>>>()?;
156
157        let column_metadatas = if let Some(column_metadatas) =
158            extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
159        {
160            column_metadatas
161        } else {
162            warn!(
163                "creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
164            );
165            vec![]
166        };
167
168        Ok(column_metadatas)
169    }
170
171    /// Creates table metadata
172    ///
173    /// Abort(non-retry):
174    /// - Failed to create table metadata.
175    pub async fn on_create_metadata(
176        &self,
177        table_metadata_manager: &TableMetadataManagerRef,
178        region_failure_detector_controller: &RegionFailureDetectorControllerRef,
179        mut table_info: TableInfo,
180        column_metadatas: &[ColumnMetadata],
181        table_route: PhysicalTableRouteValue,
182        region_wal_options: RegionWalOptions,
183    ) -> Result<()> {
184        if !column_metadatas.is_empty() {
185            update_table_info_column_ids(&mut table_info, column_metadatas);
186        }
187        let detecting_regions =
188            convert_region_routes_to_detecting_regions(&table_route.region_routes);
189        let table_route = TableRouteValue::Physical(table_route);
190        table_metadata_manager
191            .create_table_metadata(table_info, table_route, region_wal_options)
192            .await?;
193        region_failure_detector_controller
194            .register_failure_detectors(detecting_regions)
195            .await;
196
197        Ok(())
198    }
199
200    /// Returns the builder of the executor.
201    pub fn builder(&self) -> &CreateRequestBuilder {
202        &self.builder
203    }
204}