common_meta/ddl/create_table/
executor.rs1use std::collections::HashMap;
16
17use api::v1::region::region_request::Body as PbRegionRequest;
18use api::v1::region::{RegionRequest, RegionRequestHeader};
19use common_telemetry::tracing_context::TracingContext;
20use common_telemetry::warn;
21use futures::future::join_all;
22use snafu::ensure;
23use store_api::metadata::ColumnMetadata;
24use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
25use store_api::storage::RegionId;
26use table::metadata::{TableId, TableInfo};
27use table::table_name::TableName;
28
29use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
30use crate::ddl::utils::{
31 add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
32 extract_column_metadatas, region_storage_path,
33};
34use crate::ddl::{CreateRequestBuilder, RegionFailureDetectorControllerRef};
35use crate::error::{self, Result};
36use crate::key::TableMetadataManagerRef;
37use crate::key::table_name::TableNameKey;
38use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
39use crate::node_manager::NodeManagerRef;
40use crate::rpc::router::{RegionRoute, find_leader_regions, find_leaders};
41use crate::wal_provider::RegionWalOptions;
42
43pub struct CreateTableExecutor {
47 create_if_not_exists: bool,
48 table_name: TableName,
49 builder: CreateRequestBuilder,
50}
51
52impl CreateTableExecutor {
53 pub fn new(
55 table_name: TableName,
56 create_if_not_exists: bool,
57 builder: CreateRequestBuilder,
58 ) -> Self {
59 Self {
60 create_if_not_exists,
61 table_name,
62 builder,
63 }
64 }
65
66 pub async fn on_prepare(
74 &self,
75 table_metadata_manager: &TableMetadataManagerRef,
76 ) -> Result<Option<TableId>> {
77 let table_name_value = table_metadata_manager
78 .table_name_manager()
79 .get(TableNameKey::new(
80 &self.table_name.catalog_name,
81 &self.table_name.schema_name,
82 &self.table_name.table_name,
83 ))
84 .await?;
85
86 if let Some(value) = table_name_value {
87 ensure!(
88 self.create_if_not_exists,
89 error::TableAlreadyExistsSnafu {
90 table_name: self.table_name.to_string(),
91 }
92 );
93
94 return Ok(Some(value.table_id()));
95 }
96
97 Ok(None)
98 }
99
100 pub async fn on_create_regions(
101 &self,
102 node_manager: &NodeManagerRef,
103 table_id: TableId,
104 region_routes: &[RegionRoute],
105 region_wal_options: &RegionWalOptions,
106 ) -> Result<Vec<ColumnMetadata>> {
107 let storage_path =
108 region_storage_path(&self.table_name.catalog_name, &self.table_name.schema_name);
109 let leaders = find_leaders(region_routes);
110 let mut create_region_tasks = Vec::with_capacity(leaders.len());
111 let partition_exprs = region_routes
112 .iter()
113 .map(|r| (r.region.id.region_number(), r.region.partition_expr()))
114 .collect::<HashMap<_, _>>();
115
116 for datanode in leaders {
117 let requester = node_manager.datanode(&datanode).await;
118
119 let regions = find_leader_regions(region_routes, &datanode);
120 let mut requests = Vec::with_capacity(regions.len());
121 for region_number in regions {
122 let region_id = RegionId::new(table_id, region_number);
123 let create_region_request = self.builder.build_one(
124 region_id,
125 storage_path.clone(),
126 region_wal_options,
127 &partition_exprs,
128 )?;
129 requests.push(PbRegionRequest::Create(create_region_request));
130 }
131
132 for request in requests {
133 let request = RegionRequest {
134 header: Some(RegionRequestHeader {
135 tracing_context: TracingContext::from_current_span().to_w3c(),
136 ..Default::default()
137 }),
138 body: Some(request),
139 };
140
141 let datanode = datanode.clone();
142 let requester = requester.clone();
143 create_region_tasks.push(async move {
144 requester
145 .handle(request)
146 .await
147 .map_err(add_peer_context_if_needed(datanode))
148 });
149 }
150 }
151
152 let mut results = join_all(create_region_tasks)
153 .await
154 .into_iter()
155 .collect::<Result<Vec<_>>>()?;
156
157 let column_metadatas = if let Some(column_metadatas) =
158 extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
159 {
160 column_metadatas
161 } else {
162 warn!(
163 "creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
164 );
165 vec![]
166 };
167
168 Ok(column_metadatas)
169 }
170
171 pub async fn on_create_metadata(
176 &self,
177 table_metadata_manager: &TableMetadataManagerRef,
178 region_failure_detector_controller: &RegionFailureDetectorControllerRef,
179 mut table_info: TableInfo,
180 column_metadatas: &[ColumnMetadata],
181 table_route: PhysicalTableRouteValue,
182 region_wal_options: RegionWalOptions,
183 ) -> Result<()> {
184 if !column_metadatas.is_empty() {
185 update_table_info_column_ids(&mut table_info, column_metadatas);
186 }
187 let detecting_regions =
188 convert_region_routes_to_detecting_regions(&table_route.region_routes);
189 let table_route = TableRouteValue::Physical(table_route);
190 table_metadata_manager
191 .create_table_metadata(table_info, table_route, region_wal_options)
192 .await?;
193 region_failure_detector_controller
194 .register_failure_detectors(detecting_regions)
195 .await;
196
197 Ok(())
198 }
199
200 pub fn builder(&self) -> &CreateRequestBuilder {
202 &self.builder
203 }
204}