common_meta/ddl/create_table/
executor.rs1use std::collections::HashMap;
16
17use api::v1::region::region_request::Body as PbRegionRequest;
18use api::v1::region::{RegionRequest, RegionRequestHeader};
19use common_telemetry::tracing_context::TracingContext;
20use common_telemetry::warn;
21use futures::future::join_all;
22use snafu::ensure;
23use store_api::metadata::ColumnMetadata;
24use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
25use store_api::storage::{RegionId, RegionNumber};
26use table::metadata::{RawTableInfo, TableId};
27use table::table_name::TableName;
28
29use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
30use crate::ddl::utils::{
31 add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
32 extract_column_metadatas, region_storage_path,
33};
34use crate::ddl::{CreateRequestBuilder, RegionFailureDetectorControllerRef};
35use crate::error::{self, Result};
36use crate::key::TableMetadataManagerRef;
37use crate::key::table_name::TableNameKey;
38use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
39use crate::node_manager::NodeManagerRef;
40use crate::rpc::router::{RegionRoute, find_leader_regions, find_leaders};
41
42pub struct CreateTableExecutor {
46 create_if_not_exists: bool,
47 table_name: TableName,
48 builder: CreateRequestBuilder,
49}
50
51impl CreateTableExecutor {
52 pub fn new(
54 table_name: TableName,
55 create_if_not_exists: bool,
56 builder: CreateRequestBuilder,
57 ) -> Self {
58 Self {
59 create_if_not_exists,
60 table_name,
61 builder,
62 }
63 }
64
65 pub async fn on_prepare(
73 &self,
74 table_metadata_manager: &TableMetadataManagerRef,
75 ) -> Result<Option<TableId>> {
76 let table_name_value = table_metadata_manager
77 .table_name_manager()
78 .get(TableNameKey::new(
79 &self.table_name.catalog_name,
80 &self.table_name.schema_name,
81 &self.table_name.table_name,
82 ))
83 .await?;
84
85 if let Some(value) = table_name_value {
86 ensure!(
87 self.create_if_not_exists,
88 error::TableAlreadyExistsSnafu {
89 table_name: self.table_name.to_string(),
90 }
91 );
92
93 return Ok(Some(value.table_id()));
94 }
95
96 Ok(None)
97 }
98
99 pub async fn on_create_regions(
100 &self,
101 node_manager: &NodeManagerRef,
102 table_id: TableId,
103 region_routes: &[RegionRoute],
104 region_wal_options: &HashMap<RegionNumber, String>,
105 ) -> Result<Vec<ColumnMetadata>> {
106 let storage_path =
107 region_storage_path(&self.table_name.catalog_name, &self.table_name.schema_name);
108 let leaders = find_leaders(region_routes);
109 let mut create_region_tasks = Vec::with_capacity(leaders.len());
110 let partition_exprs = region_routes
111 .iter()
112 .map(|r| (r.region.id.region_number(), r.region.partition_expr()))
113 .collect::<HashMap<_, _>>();
114
115 for datanode in leaders {
116 let requester = node_manager.datanode(&datanode).await;
117
118 let regions = find_leader_regions(region_routes, &datanode);
119 let mut requests = Vec::with_capacity(regions.len());
120 for region_number in regions {
121 let region_id = RegionId::new(table_id, region_number);
122 let create_region_request = self.builder.build_one(
123 region_id,
124 storage_path.clone(),
125 region_wal_options,
126 &partition_exprs,
127 );
128 requests.push(PbRegionRequest::Create(create_region_request));
129 }
130
131 for request in requests {
132 let request = RegionRequest {
133 header: Some(RegionRequestHeader {
134 tracing_context: TracingContext::from_current_span().to_w3c(),
135 ..Default::default()
136 }),
137 body: Some(request),
138 };
139
140 let datanode = datanode.clone();
141 let requester = requester.clone();
142 create_region_tasks.push(async move {
143 requester
144 .handle(request)
145 .await
146 .map_err(add_peer_context_if_needed(datanode))
147 });
148 }
149 }
150
151 let mut results = join_all(create_region_tasks)
152 .await
153 .into_iter()
154 .collect::<Result<Vec<_>>>()?;
155
156 let column_metadatas = if let Some(column_metadatas) =
157 extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
158 {
159 column_metadatas
160 } else {
161 warn!(
162 "creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
163 );
164 vec![]
165 };
166
167 Ok(column_metadatas)
168 }
169
170 pub async fn on_create_metadata(
175 &self,
176 table_metadata_manager: &TableMetadataManagerRef,
177 region_failure_detector_controller: &RegionFailureDetectorControllerRef,
178 mut raw_table_info: RawTableInfo,
179 column_metadatas: &[ColumnMetadata],
180 table_route: PhysicalTableRouteValue,
181 region_wal_options: HashMap<RegionNumber, String>,
182 ) -> Result<()> {
183 if !column_metadatas.is_empty() {
184 update_table_info_column_ids(&mut raw_table_info, column_metadatas);
185 }
186 let detecting_regions =
187 convert_region_routes_to_detecting_regions(&table_route.region_routes);
188 let table_route = TableRouteValue::Physical(table_route);
189 table_metadata_manager
190 .create_table_metadata(raw_table_info, table_route, region_wal_options)
191 .await?;
192 region_failure_detector_controller
193 .register_failure_detectors(detecting_regions)
194 .await;
195
196 Ok(())
197 }
198
199 pub fn builder(&self) -> &CreateRequestBuilder {
201 &self.builder
202 }
203}