1pub(crate) mod raw_table_info;
16pub(crate) mod table_id;
17pub(crate) mod table_info;
18
19use std::collections::HashMap;
20use std::fmt::Debug;
21
22use api::region::RegionResponse;
23use api::v1::region::sync_request::ManifestInfo;
24use api::v1::region::{
25 region_request, MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader,
26 SyncRequest,
27};
28use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
29use common_error::ext::BoxedError;
30use common_procedure::error::Error as ProcedureError;
31use common_telemetry::tracing_context::TracingContext;
32use common_telemetry::{error, info, warn};
33use common_wal::options::WalOptions;
34use futures::future::join_all;
35use snafu::{ensure, OptionExt, ResultExt};
36use store_api::metadata::ColumnMetadata;
37use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
38use store_api::region_engine::RegionManifestInfo;
39use store_api::storage::{RegionId, RegionNumber};
40use table::metadata::TableId;
41use table::table_reference::TableReference;
42
43use crate::ddl::{DdlContext, DetectingRegion};
44use crate::error::{
45 self, DecodeJsonSnafu, Error, MetadataCorruptionSnafu, OperateDatanodeSnafu,
46 ParseWalOptionsSnafu, Result, TableNotFoundSnafu, UnsupportedSnafu,
47};
48use crate::key::datanode_table::DatanodeTableValue;
49use crate::key::table_name::TableNameKey;
50use crate::key::table_route::TableRouteValue;
51use crate::key::{TableMetadataManager, TableMetadataManagerRef};
52use crate::peer::Peer;
53use crate::rpc::ddl::CreateTableTask;
54use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
55
56pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
58 move |err| {
59 error!(err; "Failed to operate datanode, peer: {}", datanode);
60 if !err.is_retry_later() {
61 return Err::<(), BoxedError>(BoxedError::new(err))
62 .context(OperateDatanodeSnafu { peer: datanode })
63 .unwrap_err();
64 }
65 err
66 }
67}
68
69pub fn map_to_procedure_error(e: Error) -> ProcedureError {
74 match (e.is_retry_later(), e.need_clean_poisons()) {
75 (true, true) => ProcedureError::retry_later_and_clean_poisons(e),
76 (true, false) => ProcedureError::retry_later(e),
77 (false, true) => ProcedureError::external_and_clean_poisons(e),
78 (false, false) => ProcedureError::external(e),
79 }
80}
81
82#[inline]
83pub fn region_storage_path(catalog: &str, schema: &str) -> String {
84 format!("{}/{}", catalog, schema)
85}
86
87pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
89 let mut split = path.split('/');
90 Some((split.next()?.to_string(), split.next()?.to_string()))
91}
92
93pub async fn check_and_get_physical_table_id(
94 table_metadata_manager: &TableMetadataManagerRef,
95 tasks: &[CreateTableTask],
96) -> Result<TableId> {
97 let mut physical_table_name = None;
98 for task in tasks {
99 ensure!(
100 task.create_table.engine == METRIC_ENGINE,
101 UnsupportedSnafu {
102 operation: format!("create table with engine {}", task.create_table.engine)
103 }
104 );
105 let current_physical_table_name = task
106 .create_table
107 .table_options
108 .get(LOGICAL_TABLE_METADATA_KEY)
109 .context(UnsupportedSnafu {
110 operation: format!(
111 "create table without table options {}",
112 LOGICAL_TABLE_METADATA_KEY,
113 ),
114 })?;
115 let current_physical_table_name = TableNameKey::new(
116 &task.create_table.catalog_name,
117 &task.create_table.schema_name,
118 current_physical_table_name,
119 );
120
121 physical_table_name = match physical_table_name {
122 Some(name) => {
123 ensure!(
124 name == current_physical_table_name,
125 UnsupportedSnafu {
126 operation: format!(
127 "create table with different physical table name {} and {}",
128 name, current_physical_table_name
129 )
130 }
131 );
132 Some(name)
133 }
134 None => Some(current_physical_table_name),
135 };
136 }
137 let physical_table_name = physical_table_name.unwrap();
139 table_metadata_manager
140 .table_name_manager()
141 .get(physical_table_name)
142 .await?
143 .with_context(|| TableNotFoundSnafu {
144 table_name: TableReference::from(physical_table_name).to_string(),
145 })
146 .map(|table| table.table_id())
147}
148
149pub async fn get_physical_table_id(
150 table_metadata_manager: &TableMetadataManagerRef,
151 logical_table_name: TableNameKey<'_>,
152) -> Result<TableId> {
153 let logical_table_id = table_metadata_manager
154 .table_name_manager()
155 .get(logical_table_name)
156 .await?
157 .with_context(|| TableNotFoundSnafu {
158 table_name: TableReference::from(logical_table_name).to_string(),
159 })
160 .map(|table| table.table_id())?;
161
162 table_metadata_manager
163 .table_route_manager()
164 .get_physical_table_id(logical_table_id)
165 .await
166}
167
168pub fn convert_region_routes_to_detecting_regions(
170 region_routes: &[RegionRoute],
171) -> Vec<DetectingRegion> {
172 region_routes
173 .iter()
174 .flat_map(|route| {
175 route
176 .leader_peer
177 .as_ref()
178 .map(|peer| (peer.id, route.region.id))
179 })
180 .collect::<Vec<_>>()
181}
182
183pub fn parse_region_wal_options(
185 serialized_options: &HashMap<RegionNumber, String>,
186) -> Result<HashMap<RegionNumber, WalOptions>> {
187 let mut region_wal_options = HashMap::with_capacity(serialized_options.len());
188 for (region_number, wal_options) in serialized_options {
189 let wal_option = serde_json::from_str::<WalOptions>(wal_options)
190 .context(ParseWalOptionsSnafu { wal_options })?;
191 region_wal_options.insert(*region_number, wal_option);
192 }
193 Ok(region_wal_options)
194}
195
196pub async fn get_region_wal_options(
198 table_metadata_manager: &TableMetadataManager,
199 table_route_value: &TableRouteValue,
200 physical_table_id: TableId,
201) -> Result<HashMap<RegionNumber, WalOptions>> {
202 let region_wal_options =
203 if let TableRouteValue::Physical(table_route_value) = &table_route_value {
204 let datanode_table_values = table_metadata_manager
205 .datanode_table_manager()
206 .regions(physical_table_id, table_route_value)
207 .await?;
208 extract_region_wal_options(&datanode_table_values)?
209 } else {
210 HashMap::new()
211 };
212 Ok(region_wal_options)
213}
214
215pub fn extract_region_wal_options(
217 datanode_table_values: &Vec<DatanodeTableValue>,
218) -> Result<HashMap<RegionNumber, WalOptions>> {
219 let mut region_wal_options = HashMap::new();
220 for value in datanode_table_values {
221 let serialized_options = &value.region_info.region_wal_options;
222 let parsed_options = parse_region_wal_options(serialized_options)?;
223 region_wal_options.extend(parsed_options);
224 }
225 Ok(region_wal_options)
226}
227
228pub enum MultipleResults<T> {
236 Ok(Vec<T>),
237 PartialRetryable(Error),
238 PartialNonRetryable(Error),
239 AllRetryable(Error),
240 AllNonRetryable(Error),
241}
242
243pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
249 if results.is_empty() {
250 return MultipleResults::Ok(Vec::new());
251 }
252 let num_results = results.len();
253 let mut retryable_results = Vec::new();
254 let mut non_retryable_results = Vec::new();
255 let mut ok_results = Vec::new();
256
257 for result in results {
258 match result {
259 Ok(value) => ok_results.push(value),
260 Err(err) => {
261 if err.is_retry_later() {
262 retryable_results.push(err);
263 } else {
264 non_retryable_results.push(err);
265 }
266 }
267 }
268 }
269
270 common_telemetry::debug!(
271 "retryable_results: {}, non_retryable_results: {}, ok_results: {}",
272 retryable_results.len(),
273 non_retryable_results.len(),
274 ok_results.len()
275 );
276
277 if retryable_results.len() == num_results {
278 return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
279 } else if non_retryable_results.len() == num_results {
280 warn!("all non retryable results: {}", non_retryable_results.len());
281 for err in &non_retryable_results {
282 error!(err; "non retryable error");
283 }
284 return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
285 } else if ok_results.len() == num_results {
286 return MultipleResults::Ok(ok_results);
287 } else if !retryable_results.is_empty()
288 && !ok_results.is_empty()
289 && non_retryable_results.is_empty()
290 {
291 return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
292 }
293
294 warn!(
295 "partial non retryable results: {}, retryable results: {}, ok results: {}",
296 non_retryable_results.len(),
297 retryable_results.len(),
298 ok_results.len()
299 );
300 for err in &non_retryable_results {
301 error!(err; "non retryable error");
302 }
303 MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
305}
306
307pub fn parse_manifest_infos_from_extensions(
309 extensions: &HashMap<String, Vec<u8>>,
310) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
311 let data_manifest_version =
312 extensions
313 .get(MANIFEST_INFO_EXTENSION_KEY)
314 .context(error::UnexpectedSnafu {
315 err_msg: "manifest info extension not found",
316 })?;
317 let data_manifest_version =
318 RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
319 Ok(data_manifest_version)
320}
321
322pub fn parse_column_metadatas(
324 extensions: &HashMap<String, Vec<u8>>,
325 key: &str,
326) -> Result<Vec<ColumnMetadata>> {
327 let value = extensions.get(key).context(error::UnexpectedSnafu {
328 err_msg: format!("column metadata extension not found: {}", key),
329 })?;
330 let column_metadatas = ColumnMetadata::decode_list(value).context(error::SerdeJsonSnafu {})?;
331 Ok(column_metadatas)
332}
333
334pub async fn sync_follower_regions(
336 context: &DdlContext,
337 table_id: TableId,
338 results: &[RegionResponse],
339 region_routes: &[RegionRoute],
340 engine: &str,
341) -> Result<()> {
342 if engine != MITO_ENGINE && engine != METRIC_ENGINE {
343 info!(
344 "Skip submitting sync region requests for table_id: {}, engine: {}",
345 table_id, engine
346 );
347 return Ok(());
348 }
349
350 let results = results
351 .iter()
352 .map(|response| parse_manifest_infos_from_extensions(&response.extensions))
353 .collect::<Result<Vec<_>>>()?
354 .into_iter()
355 .flatten()
356 .collect::<HashMap<_, _>>();
357
358 let is_mito_engine = engine == MITO_ENGINE;
359
360 let followers = find_followers(region_routes);
361 if followers.is_empty() {
362 return Ok(());
363 }
364 let mut sync_region_tasks = Vec::with_capacity(followers.len());
365 for datanode in followers {
366 let requester = context.node_manager.datanode(&datanode).await;
367 let regions = find_follower_regions(region_routes, &datanode);
368 for region in regions {
369 let region_id = RegionId::new(table_id, region);
370 let manifest_info = if is_mito_engine {
371 let region_manifest_info =
372 results.get(®ion_id).context(error::UnexpectedSnafu {
373 err_msg: format!("No manifest info found for region {}", region_id),
374 })?;
375 ensure!(
376 region_manifest_info.is_mito(),
377 error::UnexpectedSnafu {
378 err_msg: format!("Region {} is not a mito region", region_id)
379 }
380 );
381 ManifestInfo::MitoManifestInfo(MitoManifestInfo {
382 data_manifest_version: region_manifest_info.data_manifest_version(),
383 })
384 } else {
385 let region_manifest_info =
386 results.get(®ion_id).context(error::UnexpectedSnafu {
387 err_msg: format!("No manifest info found for region {}", region_id),
388 })?;
389 ensure!(
390 region_manifest_info.is_metric(),
391 error::UnexpectedSnafu {
392 err_msg: format!("Region {} is not a metric region", region_id)
393 }
394 );
395 ManifestInfo::MetricManifestInfo(MetricManifestInfo {
396 data_manifest_version: region_manifest_info.data_manifest_version(),
397 metadata_manifest_version: region_manifest_info
398 .metadata_manifest_version()
399 .unwrap_or_default(),
400 })
401 };
402 let request = RegionRequest {
403 header: Some(RegionRequestHeader {
404 tracing_context: TracingContext::from_current_span().to_w3c(),
405 ..Default::default()
406 }),
407 body: Some(region_request::Body::Sync(SyncRequest {
408 region_id: region_id.as_u64(),
409 manifest_info: Some(manifest_info),
410 })),
411 };
412
413 let datanode = datanode.clone();
414 let requester = requester.clone();
415 sync_region_tasks.push(async move {
416 requester
417 .handle(request)
418 .await
419 .map_err(add_peer_context_if_needed(datanode))
420 });
421 }
422 }
423
424 if let Err(err) = join_all(sync_region_tasks)
427 .await
428 .into_iter()
429 .collect::<Result<Vec<_>>>()
430 {
431 error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
432 }
433 info!("Sync follower regions on datanodes, table_id: {}", table_id);
434
435 Ok(())
436}
437
438pub fn extract_column_metadatas(
440 results: &mut [RegionResponse],
441 key: &str,
442) -> Result<Option<Vec<ColumnMetadata>>> {
443 let schemas = results
444 .iter_mut()
445 .map(|r| r.extensions.remove(key))
446 .collect::<Vec<_>>();
447
448 if schemas.is_empty() {
449 return Ok(None);
450 }
451
452 let first = schemas.first().unwrap();
455 ensure!(
456 schemas.iter().all(|x| x == first),
457 MetadataCorruptionSnafu {
458 err_msg: "The table column metadata schemas from datanodes are not the same."
459 }
460 );
461
462 if let Some(first) = first {
463 let column_metadatas = ColumnMetadata::decode_list(first).context(DecodeJsonSnafu)?;
464 Ok(Some(column_metadatas))
465 } else {
466 Ok(None)
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473
474 #[test]
475 fn test_get_catalog_and_schema() {
476 let test_catalog = "my_catalog";
477 let test_schema = "my_schema";
478 let path = region_storage_path(test_catalog, test_schema);
479 let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
480 assert_eq!(catalog, test_catalog);
481 assert_eq!(schema, test_schema);
482 }
483}