common_meta/ddl/
create_logical_tables.rs1mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use api::region::RegionResponse;
21use api::v1::CreateTableExpr;
22use async_trait::async_trait;
23use common_catalog::consts::METRIC_ENGINE;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
26use common_telemetry::{debug, error, warn};
27use futures::future;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
32use store_api::storage::{RegionId, RegionNumber};
33use strum::AsRefStr;
34use table::metadata::{RawTableInfo, TableId};
35
36use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, sync_follower_regions};
37use crate::ddl::DdlContext;
38use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
39use crate::key::table_route::TableRouteValue;
40use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
41use crate::metrics;
42use crate::rpc::ddl::CreateTableTask;
43use crate::rpc::router::{find_leaders, RegionRoute};
44
45pub struct CreateLogicalTablesProcedure {
46 pub context: DdlContext,
47 pub data: CreateTablesData,
48}
49
50impl CreateLogicalTablesProcedure {
51 pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateLogicalTables";
52
53 pub fn new(
54 tasks: Vec<CreateTableTask>,
55 physical_table_id: TableId,
56 context: DdlContext,
57 ) -> Self {
58 Self {
59 context,
60 data: CreateTablesData {
61 state: CreateTablesState::Prepare,
62 tasks,
63 table_ids_already_exists: vec![],
64 physical_table_id,
65 physical_region_numbers: vec![],
66 physical_columns: vec![],
67 },
68 }
69 }
70
71 pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
72 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
73 Ok(Self { context, data })
74 }
75
76 pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
87 self.check_input_tasks()?;
88 self.fill_physical_table_info().await?;
90 self.check_tables_already_exist().await?;
92
93 if self
95 .data
96 .table_ids_already_exists
97 .iter()
98 .all(Option::is_some)
99 {
100 return Ok(Status::done_with_output(
101 self.data
102 .table_ids_already_exists
103 .drain(..)
104 .flatten()
105 .collect::<Vec<_>>(),
106 ));
107 }
108
109 self.allocate_table_ids().await?;
111
112 self.data.state = CreateTablesState::DatanodeCreateRegions;
113 Ok(Status::executing(true))
114 }
115
116 pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
117 let (_, physical_table_route) = self
118 .context
119 .table_metadata_manager
120 .table_route_manager()
121 .get_physical_table_route(self.data.physical_table_id)
122 .await?;
123
124 self.create_regions(&physical_table_route.region_routes)
125 .await
126 }
127
128 pub async fn on_create_metadata(&mut self) -> Result<Status> {
134 self.update_physical_table_metadata().await?;
135 let table_ids = self.create_logical_tables_metadata().await?;
136
137 Ok(Status::done_with_output(table_ids))
138 }
139
140 async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
141 let leaders = find_leaders(region_routes);
142 let mut create_region_tasks = Vec::with_capacity(leaders.len());
143
144 for peer in leaders {
145 let requester = self.context.node_manager.datanode(&peer).await;
146 let Some(request) = self.make_request(&peer, region_routes)? else {
147 debug!("no region request to send to datanode {}", peer);
148 break;
151 };
152
153 create_region_tasks.push(async move {
154 requester
155 .handle(request)
156 .await
157 .map_err(add_peer_context_if_needed(peer))
158 });
159 }
160
161 let mut results = future::join_all(create_region_tasks)
162 .await
163 .into_iter()
164 .collect::<Result<Vec<_>>>()?;
165
166 let phy_raw_schemas = results
168 .iter_mut()
169 .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
170 .collect::<Vec<_>>();
171
172 if phy_raw_schemas.is_empty() {
173 self.submit_sync_region_requests(results, region_routes)
174 .await;
175 self.data.state = CreateTablesState::CreateMetadata;
176 return Ok(Status::executing(false));
177 }
178
179 let first = phy_raw_schemas.first().unwrap();
182 ensure!(
183 phy_raw_schemas.iter().all(|x| x == first),
184 MetadataCorruptionSnafu {
185 err_msg: "The physical schemas from datanodes are not the same."
186 }
187 );
188
189 if let Some(phy_raw_schemas) = first {
191 self.data.physical_columns =
192 ColumnMetadata::decode_list(phy_raw_schemas).context(DecodeJsonSnafu)?;
193 } else {
194 warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
195 }
196
197 self.submit_sync_region_requests(results, region_routes)
198 .await;
199 self.data.state = CreateTablesState::CreateMetadata;
200
201 Ok(Status::executing(true))
202 }
203
204 async fn submit_sync_region_requests(
205 &self,
206 results: Vec<RegionResponse>,
207 region_routes: &[RegionRoute],
208 ) {
209 if let Err(err) = sync_follower_regions(
210 &self.context,
211 self.data.physical_table_id,
212 results,
213 region_routes,
214 METRIC_ENGINE,
215 )
216 .await
217 {
218 error!(err; "Failed to sync regions for physical table_id: {}",self.data.physical_table_id);
219 }
220 }
221}
222
223#[async_trait]
224impl Procedure for CreateLogicalTablesProcedure {
225 fn type_name(&self) -> &str {
226 Self::TYPE_NAME
227 }
228
229 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
230 let state = &self.data.state;
231
232 let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLES
233 .with_label_values(&[state.as_ref()])
234 .start_timer();
235
236 match state {
237 CreateTablesState::Prepare => self.on_prepare().await,
238 CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
239 CreateTablesState::CreateMetadata => self.on_create_metadata().await,
240 }
241 .map_err(handle_retry_error)
242 }
243
244 fn dump(&self) -> ProcedureResult<String> {
245 serde_json::to_string(&self.data).context(ToJsonSnafu)
246 }
247
248 fn lock_key(&self) -> LockKey {
249 let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
253 let table_ref = self.data.tasks[0].table_ref();
254 lock_key.push(CatalogLock::Read(table_ref.catalog).into());
255 lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
256 lock_key.push(TableLock::Write(self.data.physical_table_id).into());
257
258 for task in &self.data.tasks {
259 lock_key.push(
260 TableNameLock::new(
261 &task.create_table.catalog_name,
262 &task.create_table.schema_name,
263 &task.create_table.table_name,
264 )
265 .into(),
266 );
267 }
268 LockKey::new(lock_key)
269 }
270}
271
272#[derive(Debug, Serialize, Deserialize)]
273pub struct CreateTablesData {
274 state: CreateTablesState,
275 tasks: Vec<CreateTableTask>,
276 table_ids_already_exists: Vec<Option<TableId>>,
277 physical_table_id: TableId,
278 physical_region_numbers: Vec<RegionNumber>,
279 physical_columns: Vec<ColumnMetadata>,
280}
281
282impl CreateTablesData {
283 pub fn state(&self) -> &CreateTablesState {
284 &self.state
285 }
286
287 fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> {
288 self.tasks
289 .iter()
290 .map(|task| &task.create_table)
291 .collect::<Vec<_>>()
292 }
293
294 fn remaining_tasks(&self) -> Vec<(RawTableInfo, TableRouteValue)> {
297 self.tasks
298 .iter()
299 .zip(self.table_ids_already_exists.iter())
300 .flat_map(|(task, table_id)| {
301 if table_id.is_none() {
302 let table_info = task.table_info.clone();
303 let region_ids = self
304 .physical_region_numbers
305 .iter()
306 .map(|region_number| {
307 RegionId::new(table_info.ident.table_id, *region_number)
308 })
309 .collect();
310 let table_route = TableRouteValue::logical(self.physical_table_id, region_ids);
311 Some((table_info, table_route))
312 } else {
313 None
314 }
315 })
316 .collect::<Vec<_>>()
317 }
318}
319
320#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
321pub enum CreateTablesState {
322 Prepare,
324 DatanodeCreateRegions,
326 CreateMetadata,
328}