common_meta/ddl/
create_logical_tables.rs1mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use api::region::RegionResponse;
21use api::v1::CreateTableExpr;
22use async_trait::async_trait;
23use common_catalog::consts::METRIC_ENGINE;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
26use common_telemetry::{debug, error, warn};
27use futures::future;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
32use store_api::storage::{RegionId, RegionNumber};
33use strum::AsRefStr;
34use table::metadata::{RawTableInfo, TableId};
35
36use crate::ddl::utils::{
37 add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
38};
39use crate::ddl::DdlContext;
40use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
41use crate::key::table_route::TableRouteValue;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
43use crate::metrics;
44use crate::rpc::ddl::CreateTableTask;
45use crate::rpc::router::{find_leaders, RegionRoute};
46
47pub struct CreateLogicalTablesProcedure {
48 pub context: DdlContext,
49 pub data: CreateTablesData,
50}
51
52impl CreateLogicalTablesProcedure {
53 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateLogicalTables";
54
55 pub fn new(
56 tasks: Vec<CreateTableTask>,
57 physical_table_id: TableId,
58 context: DdlContext,
59 ) -> Self {
60 Self {
61 context,
62 data: CreateTablesData {
63 state: CreateTablesState::Prepare,
64 tasks,
65 table_ids_already_exists: vec![],
66 physical_table_id,
67 physical_region_numbers: vec![],
68 physical_columns: vec![],
69 },
70 }
71 }
72
73 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
74 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
75 Ok(Self { context, data })
76 }
77
78 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
89 self.check_input_tasks()?;
90 self.fill_physical_table_info().await?;
92 self.check_tables_already_exist().await?;
94
95 if self
97 .data
98 .table_ids_already_exists
99 .iter()
100 .all(Option::is_some)
101 {
102 return Ok(Status::done_with_output(
103 self.data
104 .table_ids_already_exists
105 .drain(..)
106 .flatten()
107 .collect::<Vec<_>>(),
108 ));
109 }
110
111 self.allocate_table_ids().await?;
113
114 self.data.state = CreateTablesState::DatanodeCreateRegions;
115 Ok(Status::executing(true))
116 }
117
118 pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
119 let (_, physical_table_route) = self
120 .context
121 .table_metadata_manager
122 .table_route_manager()
123 .get_physical_table_route(self.data.physical_table_id)
124 .await?;
125
126 self.create_regions(&physical_table_route.region_routes)
127 .await
128 }
129
130 pub async fn on_create_metadata(&mut self) -> Result<Status> {
136 self.update_physical_table_metadata().await?;
137 let table_ids = self.create_logical_tables_metadata().await?;
138
139 Ok(Status::done_with_output(table_ids))
140 }
141
142 async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
143 let leaders = find_leaders(region_routes);
144 let mut create_region_tasks = Vec::with_capacity(leaders.len());
145
146 for peer in leaders {
147 let requester = self.context.node_manager.datanode(&peer).await;
148 let Some(request) = self.make_request(&peer, region_routes)? else {
149 debug!("no region request to send to datanode {}", peer);
150 break;
153 };
154
155 create_region_tasks.push(async move {
156 requester
157 .handle(request)
158 .await
159 .map_err(add_peer_context_if_needed(peer))
160 });
161 }
162
163 let mut results = future::join_all(create_region_tasks)
164 .await
165 .into_iter()
166 .collect::<Result<Vec<_>>>()?;
167
168 let phy_raw_schemas = results
170 .iter_mut()
171 .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
172 .collect::<Vec<_>>();
173
174 if phy_raw_schemas.is_empty() {
175 self.submit_sync_region_requests(results, region_routes)
176 .await;
177 self.data.state = CreateTablesState::CreateMetadata;
178 return Ok(Status::executing(false));
179 }
180
181 let first = phy_raw_schemas.first().unwrap();
184 ensure!(
185 phy_raw_schemas.iter().all(|x| x == first),
186 MetadataCorruptionSnafu {
187 err_msg: "The physical schemas from datanodes are not the same."
188 }
189 );
190
191 if let Some(phy_raw_schemas) = first {
193 self.data.physical_columns =
194 ColumnMetadata::decode_list(phy_raw_schemas).context(DecodeJsonSnafu)?;
195 } else {
196 warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
197 }
198
199 self.submit_sync_region_requests(results, region_routes)
200 .await;
201 self.data.state = CreateTablesState::CreateMetadata;
202
203 Ok(Status::executing(true))
204 }
205
206 async fn submit_sync_region_requests(
207 &self,
208 results: Vec<RegionResponse>,
209 region_routes: &[RegionRoute],
210 ) {
211 if let Err(err) = sync_follower_regions(
212 &self.context,
213 self.data.physical_table_id,
214 results,
215 region_routes,
216 METRIC_ENGINE,
217 )
218 .await
219 {
220 error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id);
221 }
222 }
223}
224
225#[async_trait]
226impl Procedure for CreateLogicalTablesProcedure {
227 fn type_name(&self) -> &str {
228 Self::TYPE_NAME
229 }
230
231 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
232 let state = &self.data.state;
233
234 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES
235 .with_label_values(&[state.as_ref()])
236 .start_timer();
237
238 match state {
239 CreateTablesState::Prepare => self.on_prepare().await,
240 CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
241 CreateTablesState::CreateMetadata => self.on_create_metadata().await,
242 }
243 .map_err(map_to_procedure_error)
244 }
245
246 fn dump(&self) -> ProcedureResult<String> {
247 serde_json::to_string(&self.data).context(ToJsonSnafu)
248 }
249
250 fn lock_key(&self) -> LockKey {
251 let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
255 let table_ref = self.data.tasks[0].table_ref();
256 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
257 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
258 lock_key.push(TableLock::Write(self.data.physical_table_id).into());
259
260 for task in &self.data.tasks {
261 lock_key.push(
262 TableNameLock::new(
263 &task.create_table.catalog_name,
264 &task.create_table.schema_name,
265 &task.create_table.table_name,
266 )
267 .into(),
268 );
269 }
270 LockKey::new(lock_key)
271 }
272}
273
274#[derive(Debug, Serialize, Deserialize)]
275pub struct CreateTablesData {
276 state: CreateTablesState,
277 tasks: Vec<CreateTableTask>,
278 table_ids_already_exists: Vec<Option<TableId>>,
279 physical_table_id: TableId,
280 physical_region_numbers: Vec<RegionNumber>,
281 physical_columns: Vec<ColumnMetadata>,
282}
283
284impl CreateTablesData {
285 pub fn state(&self) -> &CreateTablesState {
286 &self.state
287 }
288
289 fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
290 self.tasks
291 .iter()
292 .map(|task| &task.create_table)
293 .collect::<Vec<_>>()
294 }
295
296 fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
299 self.tasks
300 .iter()
301 .zip(self.table_ids_already_exists.iter())
302 .flat_map(|(task, table_id)| {
303 if table_id.is_none() {
304 let table_info = task.table_info.clone();
305 let region_ids = self
306 .physical_region_numbers
307 .iter()
308 .map(|region_number| {
309 RegionId::new(table_info.ident.table_id, *region_number)
310 })
311 .collect();
312 let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
313 Some((table_info, table_route))
314 } else {
315 None
316 }
317 })
318 .collect::<Vec<_>>()
319 }
320}
321
322#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
323pub enum CreateTablesState {
324 Prepare,
326 DatanodeCreateRegions,
328 CreateMetadata,
330}