1pub(crate) mod raw_table_info;
16#[allow(dead_code)]
17pub(crate) mod region_metadata_lister;
18pub(crate) mod table_id;
19pub(crate) mod table_info;
20
21use std::collections::HashMap;
22use std::fmt::Debug;
23
24use api::region::RegionResponse;
25use api::v1::region::sync_request::ManifestInfo;
26use api::v1::region::{
27 region_request, MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader,
28 SyncRequest,
29};
30use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
31use common_error::ext::BoxedError;
32use common_procedure::error::Error as ProcedureError;
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{error, info, warn};
35use common_wal::options::WalOptions;
36use futures::future::join_all;
37use snafu::{ensure, OptionExt, ResultExt};
38use store_api::metadata::ColumnMetadata;
39use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
40use store_api::region_engine::RegionManifestInfo;
41use store_api::storage::{RegionId, RegionNumber};
42use table::metadata::TableId;
43use table::table_reference::TableReference;
44
45use crate::ddl::{DdlContext, DetectingRegion};
46use crate::error::{
47 self, DecodeJsonSnafu, Error, MetadataCorruptionSnafu, OperateDatanodeSnafu,
48 ParseWalOptionsSnafu, Result, TableNotFoundSnafu, UnsupportedSnafu,
49};
50use crate::key::datanode_table::DatanodeTableValue;
51use crate::key::table_name::TableNameKey;
52use crate::key::table_route::TableRouteValue;
53use crate::key::{TableMetadataManager, TableMetadataManagerRef};
54use crate::peer::Peer;
55use crate::rpc::ddl::CreateTableTask;
56use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
57
58pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
60 move |err| {
61 error!(err; "Failed to operate datanode, peer: {}", datanode);
62 if !err.is_retry_later() {
63 return Err::<(), BoxedError>(BoxedError::new(err))
64 .context(OperateDatanodeSnafu { peer: datanode })
65 .unwrap_err();
66 }
67 err
68 }
69}
70
71pub fn map_to_procedure_error(e: Error) -> ProcedureError {
76 match (e.is_retry_later(), e.need_clean_poisons()) {
77 (true, true) => ProcedureError::retry_later_and_clean_poisons(e),
78 (true, false) => ProcedureError::retry_later(e),
79 (false, true) => ProcedureError::external_and_clean_poisons(e),
80 (false, false) => ProcedureError::external(e),
81 }
82}
83
84#[inline]
85pub fn region_storage_path(catalog: &str, schema: &str) -> String {
86 format!("{}/{}", catalog, schema)
87}
88
89pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
91 let mut split = path.split('/');
92 Some((split.next()?.to_string(), split.next()?.to_string()))
93}
94
95pub async fn check_and_get_physical_table_id(
96 table_metadata_manager: &TableMetadataManagerRef,
97 tasks: &[CreateTableTask],
98) -> Result<TableId> {
99 let mut physical_table_name = None;
100 for task in tasks {
101 ensure!(
102 task.create_table.engine == METRIC_ENGINE,
103 UnsupportedSnafu {
104 operation: format!("create table with engine {}", task.create_table.engine)
105 }
106 );
107 let current_physical_table_name = task
108 .create_table
109 .table_options
110 .get(LOGICAL_TABLE_METADATA_KEY)
111 .context(UnsupportedSnafu {
112 operation: format!(
113 "create table without table options {}",
114 LOGICAL_TABLE_METADATA_KEY,
115 ),
116 })?;
117 let current_physical_table_name = TableNameKey::new(
118 &task.create_table.catalog_name,
119 &task.create_table.schema_name,
120 current_physical_table_name,
121 );
122
123 physical_table_name = match physical_table_name {
124 Some(name) => {
125 ensure!(
126 name == current_physical_table_name,
127 UnsupportedSnafu {
128 operation: format!(
129 "create table with different physical table name {} and {}",
130 name, current_physical_table_name
131 )
132 }
133 );
134 Some(name)
135 }
136 None => Some(current_physical_table_name),
137 };
138 }
139 let physical_table_name = physical_table_name.unwrap();
141 table_metadata_manager
142 .table_name_manager()
143 .get(physical_table_name)
144 .await?
145 .with_context(|| TableNotFoundSnafu {
146 table_name: TableReference::from(physical_table_name).to_string(),
147 })
148 .map(|table| table.table_id())
149}
150
151pub async fn get_physical_table_id(
152 table_metadata_manager: &TableMetadataManagerRef,
153 logical_table_name: TableNameKey<'_>,
154) -> Result<TableId> {
155 let logical_table_id = table_metadata_manager
156 .table_name_manager()
157 .get(logical_table_name)
158 .await?
159 .with_context(|| TableNotFoundSnafu {
160 table_name: TableReference::from(logical_table_name).to_string(),
161 })
162 .map(|table| table.table_id())?;
163
164 table_metadata_manager
165 .table_route_manager()
166 .get_physical_table_id(logical_table_id)
167 .await
168}
169
170pub fn convert_region_routes_to_detecting_regions(
172 region_routes: &[RegionRoute],
173) -> Vec<DetectingRegion> {
174 region_routes
175 .iter()
176 .flat_map(|route| {
177 route
178 .leader_peer
179 .as_ref()
180 .map(|peer| (peer.id, route.region.id))
181 })
182 .collect::<Vec<_>>()
183}
184
185pub fn parse_region_wal_options(
187 serialized_options: &HashMap<RegionNumber, String>,
188) -> Result<HashMap<RegionNumber, WalOptions>> {
189 let mut region_wal_options = HashMap::with_capacity(serialized_options.len());
190 for (region_number, wal_options) in serialized_options {
191 let wal_option = serde_json::from_str::<WalOptions>(wal_options)
192 .context(ParseWalOptionsSnafu { wal_options })?;
193 region_wal_options.insert(*region_number, wal_option);
194 }
195 Ok(region_wal_options)
196}
197
198pub async fn get_region_wal_options(
200 table_metadata_manager: &TableMetadataManager,
201 table_route_value: &TableRouteValue,
202 physical_table_id: TableId,
203) -> Result<HashMap<RegionNumber, WalOptions>> {
204 let region_wal_options =
205 if let TableRouteValue::Physical(table_route_value) = &table_route_value {
206 let datanode_table_values = table_metadata_manager
207 .datanode_table_manager()
208 .regions(physical_table_id, table_route_value)
209 .await?;
210 extract_region_wal_options(&datanode_table_values)?
211 } else {
212 HashMap::new()
213 };
214 Ok(region_wal_options)
215}
216
217pub fn extract_region_wal_options(
219 datanode_table_values: &Vec<DatanodeTableValue>,
220) -> Result<HashMap<RegionNumber, WalOptions>> {
221 let mut region_wal_options = HashMap::new();
222 for value in datanode_table_values {
223 let serialized_options = &value.region_info.region_wal_options;
224 let parsed_options = parse_region_wal_options(serialized_options)?;
225 region_wal_options.extend(parsed_options);
226 }
227 Ok(region_wal_options)
228}
229
230pub enum MultipleResults<T> {
238 Ok(Vec<T>),
239 PartialRetryable(Error),
240 PartialNonRetryable(Error),
241 AllRetryable(Error),
242 AllNonRetryable(Error),
243}
244
245pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
251 if results.is_empty() {
252 return MultipleResults::Ok(Vec::new());
253 }
254 let num_results = results.len();
255 let mut retryable_results = Vec::new();
256 let mut non_retryable_results = Vec::new();
257 let mut ok_results = Vec::new();
258
259 for result in results {
260 match result {
261 Ok(value) => ok_results.push(value),
262 Err(err) => {
263 if err.is_retry_later() {
264 retryable_results.push(err);
265 } else {
266 non_retryable_results.push(err);
267 }
268 }
269 }
270 }
271
272 common_telemetry::debug!(
273 "retryable_results: {}, non_retryable_results: {}, ok_results: {}",
274 retryable_results.len(),
275 non_retryable_results.len(),
276 ok_results.len()
277 );
278
279 if retryable_results.len() == num_results {
280 return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
281 } else if non_retryable_results.len() == num_results {
282 warn!("all non retryable results: {}", non_retryable_results.len());
283 for err in &non_retryable_results {
284 error!(err; "non retryable error");
285 }
286 return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
287 } else if ok_results.len() == num_results {
288 return MultipleResults::Ok(ok_results);
289 } else if !retryable_results.is_empty()
290 && !ok_results.is_empty()
291 && non_retryable_results.is_empty()
292 {
293 return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
294 }
295
296 warn!(
297 "partial non retryable results: {}, retryable results: {}, ok results: {}",
298 non_retryable_results.len(),
299 retryable_results.len(),
300 ok_results.len()
301 );
302 for err in &non_retryable_results {
303 error!(err; "non retryable error");
304 }
305 MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
307}
308
309pub fn parse_manifest_infos_from_extensions(
311 extensions: &HashMap<String, Vec<u8>>,
312) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
313 let data_manifest_version =
314 extensions
315 .get(MANIFEST_INFO_EXTENSION_KEY)
316 .context(error::UnexpectedSnafu {
317 err_msg: "manifest info extension not found",
318 })?;
319 let data_manifest_version =
320 RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
321 Ok(data_manifest_version)
322}
323
324pub fn parse_column_metadatas(
326 extensions: &HashMap<String, Vec<u8>>,
327 key: &str,
328) -> Result<Vec<ColumnMetadata>> {
329 let value = extensions.get(key).context(error::UnexpectedSnafu {
330 err_msg: format!("column metadata extension not found: {}", key),
331 })?;
332 let column_metadatas = ColumnMetadata::decode_list(value).context(error::SerdeJsonSnafu {})?;
333 Ok(column_metadatas)
334}
335
336pub async fn sync_follower_regions(
338 context: &DdlContext,
339 table_id: TableId,
340 results: &[RegionResponse],
341 region_routes: &[RegionRoute],
342 engine: &str,
343) -> Result<()> {
344 if engine != MITO_ENGINE && engine != METRIC_ENGINE {
345 info!(
346 "Skip submitting sync region requests for table_id: {}, engine: {}",
347 table_id, engine
348 );
349 return Ok(());
350 }
351
352 let results = results
353 .iter()
354 .map(|response| parse_manifest_infos_from_extensions(&response.extensions))
355 .collect::<Result<Vec<_>>>()?
356 .into_iter()
357 .flatten()
358 .collect::<HashMap<_, _>>();
359
360 let is_mito_engine = engine == MITO_ENGINE;
361
362 let followers = find_followers(region_routes);
363 if followers.is_empty() {
364 return Ok(());
365 }
366 let mut sync_region_tasks = Vec::with_capacity(followers.len());
367 for datanode in followers {
368 let requester = context.node_manager.datanode(&datanode).await;
369 let regions = find_follower_regions(region_routes, &datanode);
370 for region in regions {
371 let region_id = RegionId::new(table_id, region);
372 let manifest_info = if is_mito_engine {
373 let region_manifest_info =
374 results.get(®ion_id).context(error::UnexpectedSnafu {
375 err_msg: format!("No manifest info found for region {}", region_id),
376 })?;
377 ensure!(
378 region_manifest_info.is_mito(),
379 error::UnexpectedSnafu {
380 err_msg: format!("Region {} is not a mito region", region_id)
381 }
382 );
383 ManifestInfo::MitoManifestInfo(MitoManifestInfo {
384 data_manifest_version: region_manifest_info.data_manifest_version(),
385 })
386 } else {
387 let region_manifest_info =
388 results.get(®ion_id).context(error::UnexpectedSnafu {
389 err_msg: format!("No manifest info found for region {}", region_id),
390 })?;
391 ensure!(
392 region_manifest_info.is_metric(),
393 error::UnexpectedSnafu {
394 err_msg: format!("Region {} is not a metric region", region_id)
395 }
396 );
397 ManifestInfo::MetricManifestInfo(MetricManifestInfo {
398 data_manifest_version: region_manifest_info.data_manifest_version(),
399 metadata_manifest_version: region_manifest_info
400 .metadata_manifest_version()
401 .unwrap_or_default(),
402 })
403 };
404 let request = RegionRequest {
405 header: Some(RegionRequestHeader {
406 tracing_context: TracingContext::from_current_span().to_w3c(),
407 ..Default::default()
408 }),
409 body: Some(region_request::Body::Sync(SyncRequest {
410 region_id: region_id.as_u64(),
411 manifest_info: Some(manifest_info),
412 })),
413 };
414
415 let datanode = datanode.clone();
416 let requester = requester.clone();
417 sync_region_tasks.push(async move {
418 requester
419 .handle(request)
420 .await
421 .map_err(add_peer_context_if_needed(datanode))
422 });
423 }
424 }
425
426 if let Err(err) = join_all(sync_region_tasks)
429 .await
430 .into_iter()
431 .collect::<Result<Vec<_>>>()
432 {
433 error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
434 }
435 info!("Sync follower regions on datanodes, table_id: {}", table_id);
436
437 Ok(())
438}
439
440pub fn extract_column_metadatas(
442 results: &mut [RegionResponse],
443 key: &str,
444) -> Result<Option<Vec<ColumnMetadata>>> {
445 let schemas = results
446 .iter_mut()
447 .map(|r| r.extensions.remove(key))
448 .collect::<Vec<_>>();
449
450 if schemas.is_empty() {
451 warn!("extract_column_metadatas: no extension key `{key}` found in results");
452 return Ok(None);
453 }
454
455 let first = schemas.first().unwrap();
458 ensure!(
459 schemas.iter().all(|x| x == first),
460 MetadataCorruptionSnafu {
461 err_msg: "The table column metadata schemas from datanodes are not the same."
462 }
463 );
464
465 if let Some(first) = first {
466 let column_metadatas = ColumnMetadata::decode_list(first).context(DecodeJsonSnafu)?;
467 Ok(Some(column_metadatas))
468 } else {
469 Ok(None)
470 }
471}
472
473#[cfg(test)]
474mod tests {
475 use super::*;
476
477 #[test]
478 fn test_get_catalog_and_schema() {
479 let test_catalog = "my_catalog";
480 let test_schema = "my_schema";
481 let path = region_storage_path(test_catalog, test_schema);
482 let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
483 assert_eq!(catalog, test_catalog);
484 assert_eq!(schema, test_schema);
485 }
486}