1pub(crate) mod raw_table_info;
16#[allow(dead_code)]
17pub(crate) mod region_metadata_lister;
18pub(crate) mod table_id;
19pub(crate) mod table_info;
20
21use std::collections::HashMap;
22use std::fmt::Debug;
23
24use api::region::RegionResponse;
25use api::v1::region::sync_request::ManifestInfo;
26use api::v1::region::{
27 MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader, SyncRequest,
28 region_request,
29};
30use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
31use common_error::ext::BoxedError;
32use common_procedure::error::Error as ProcedureError;
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{error, info, warn};
35use futures::future::join_all;
36use snafu::{OptionExt, ResultExt, ensure};
37use store_api::metadata::ColumnMetadata;
38use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
39use store_api::region_engine::RegionManifestInfo;
40use store_api::storage::RegionId;
41use table::metadata::TableId;
42use table::table_reference::TableReference;
43
44use crate::ddl::{DdlContext, DetectingRegion};
45use crate::error::{
46 self, DecodeJsonSnafu, Error, MetadataCorruptionSnafu, OperateDatanodeSnafu, Result,
47 TableNotFoundSnafu, UnsupportedSnafu,
48};
49use crate::key::datanode_table::DatanodeTableValue;
50use crate::key::table_name::TableNameKey;
51use crate::key::table_route::TableRouteValue;
52use crate::key::{TableMetadataManager, TableMetadataManagerRef};
53use crate::peer::Peer;
54use crate::rpc::ddl::CreateTableTask;
55use crate::rpc::router::{RegionRoute, find_follower_regions, find_followers};
56use crate::wal_provider::RegionWalOptions;
57
58pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
60 move |err| {
61 error!(err; "Failed to operate datanode, peer: {}", datanode);
62 if !err.is_retry_later() {
63 return Err::<(), BoxedError>(BoxedError::new(err))
64 .context(OperateDatanodeSnafu { peer: datanode })
65 .unwrap_err();
66 }
67 err
68 }
69}
70
71pub fn map_to_procedure_error(e: Error) -> ProcedureError {
76 match (e.is_retry_later(), e.need_clean_poisons()) {
77 (true, true) => ProcedureError::retry_later_and_clean_poisons(e),
78 (true, false) => ProcedureError::retry_later(e),
79 (false, true) => ProcedureError::external_and_clean_poisons(e),
80 (false, false) => ProcedureError::external(e),
81 }
82}
83
84#[inline]
85pub fn region_storage_path(catalog: &str, schema: &str) -> String {
86 format!("{}/{}", catalog, schema)
87}
88
89pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
91 let mut split = path.split('/');
92 Some((split.next()?.to_string(), split.next()?.to_string()))
93}
94
95pub async fn check_and_get_physical_table_id(
96 table_metadata_manager: &TableMetadataManagerRef,
97 tasks: &[CreateTableTask],
98) -> Result<TableId> {
99 let mut physical_table_name = None;
100 for task in tasks {
101 ensure!(
102 task.create_table.engine == METRIC_ENGINE,
103 UnsupportedSnafu {
104 operation: format!("create table with engine {}", task.create_table.engine)
105 }
106 );
107 let current_physical_table_name = task
108 .create_table
109 .table_options
110 .get(LOGICAL_TABLE_METADATA_KEY)
111 .context(UnsupportedSnafu {
112 operation: format!(
113 "create table without table options {}",
114 LOGICAL_TABLE_METADATA_KEY,
115 ),
116 })?;
117 let current_physical_table_name = TableNameKey::new(
118 &task.create_table.catalog_name,
119 &task.create_table.schema_name,
120 current_physical_table_name,
121 );
122
123 physical_table_name = match physical_table_name {
124 Some(name) => {
125 ensure!(
126 name == current_physical_table_name,
127 UnsupportedSnafu {
128 operation: format!(
129 "create table with different physical table name {} and {}",
130 name, current_physical_table_name
131 )
132 }
133 );
134 Some(name)
135 }
136 None => Some(current_physical_table_name),
137 };
138 }
139 let physical_table_name = physical_table_name.unwrap();
141 table_metadata_manager
142 .table_name_manager()
143 .get(physical_table_name)
144 .await?
145 .with_context(|| TableNotFoundSnafu {
146 table_name: TableReference::from(physical_table_name).to_string(),
147 })
148 .map(|table| table.table_id())
149}
150
151pub async fn get_physical_table_id(
152 table_metadata_manager: &TableMetadataManagerRef,
153 logical_table_name: TableNameKey<'_>,
154) -> Result<TableId> {
155 let logical_table_id = table_metadata_manager
156 .table_name_manager()
157 .get(logical_table_name)
158 .await?
159 .with_context(|| TableNotFoundSnafu {
160 table_name: TableReference::from(logical_table_name).to_string(),
161 })
162 .map(|table| table.table_id())?;
163
164 table_metadata_manager
165 .table_route_manager()
166 .get_physical_table_id(logical_table_id)
167 .await
168}
169
170pub fn convert_region_routes_to_detecting_regions(
172 region_routes: &[RegionRoute],
173) -> Vec<DetectingRegion> {
174 region_routes
175 .iter()
176 .flat_map(|route| {
177 route
178 .leader_peer
179 .as_ref()
180 .map(|peer| (peer.id, route.region.id))
181 })
182 .collect::<Vec<_>>()
183}
184
185pub async fn get_region_wal_options(
187 table_metadata_manager: &TableMetadataManager,
188 table_route_value: &TableRouteValue,
189 physical_table_id: TableId,
190) -> Result<RegionWalOptions> {
191 let region_wal_options =
192 if let TableRouteValue::Physical(table_route_value) = &table_route_value {
193 let datanode_table_values = table_metadata_manager
194 .datanode_table_manager()
195 .regions(physical_table_id, table_route_value)
196 .await?;
197 extract_region_wal_options(&datanode_table_values)?
198 } else {
199 HashMap::new()
200 };
201 Ok(region_wal_options)
202}
203
204pub fn extract_region_wal_options(
206 datanode_table_values: &Vec<DatanodeTableValue>,
207) -> Result<RegionWalOptions> {
208 let mut region_wal_options = RegionWalOptions::new();
209 for value in datanode_table_values {
210 region_wal_options.extend(value.region_info.region_wal_options.clone());
211 }
212 Ok(region_wal_options)
213}
214
215pub enum MultipleResults<T> {
223 Ok(Vec<T>),
224 PartialRetryable(Error),
225 PartialNonRetryable(Error),
226 AllRetryable(Error),
227 AllNonRetryable(Error),
228}
229
230pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
236 if results.is_empty() {
237 return MultipleResults::Ok(Vec::new());
238 }
239 let num_results = results.len();
240 let mut retryable_results = Vec::new();
241 let mut non_retryable_results = Vec::new();
242 let mut ok_results = Vec::new();
243
244 for result in results {
245 match result {
246 Ok(value) => ok_results.push(value),
247 Err(err) => {
248 if err.is_retry_later() {
249 retryable_results.push(err);
250 } else {
251 non_retryable_results.push(err);
252 }
253 }
254 }
255 }
256
257 common_telemetry::debug!(
258 "retryable_results: {}, non_retryable_results: {}, ok_results: {}",
259 retryable_results.len(),
260 non_retryable_results.len(),
261 ok_results.len()
262 );
263
264 if retryable_results.len() == num_results {
265 return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
266 } else if non_retryable_results.len() == num_results {
267 warn!("all non retryable results: {}", non_retryable_results.len());
268 for err in &non_retryable_results {
269 error!(err; "non retryable error");
270 }
271 return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
272 } else if ok_results.len() == num_results {
273 return MultipleResults::Ok(ok_results);
274 } else if !retryable_results.is_empty()
275 && !ok_results.is_empty()
276 && non_retryable_results.is_empty()
277 {
278 return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
279 }
280
281 warn!(
282 "partial non retryable results: {}, retryable results: {}, ok results: {}",
283 non_retryable_results.len(),
284 retryable_results.len(),
285 ok_results.len()
286 );
287 for err in &non_retryable_results {
288 error!(err; "non retryable error");
289 }
290 MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
292}
293
294pub fn parse_manifest_infos_from_extensions(
296 extensions: &HashMap<String, Vec<u8>>,
297) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
298 let data_manifest_version =
299 extensions
300 .get(MANIFEST_INFO_EXTENSION_KEY)
301 .context(error::UnexpectedSnafu {
302 err_msg: "manifest info extension not found",
303 })?;
304 let data_manifest_version =
305 RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
306 Ok(data_manifest_version)
307}
308
309pub fn parse_column_metadatas(
311 extensions: &HashMap<String, Vec<u8>>,
312 key: &str,
313) -> Result<Vec<ColumnMetadata>> {
314 let value = extensions.get(key).context(error::UnexpectedSnafu {
315 err_msg: format!("column metadata extension not found: {}", key),
316 })?;
317 let column_metadatas = ColumnMetadata::decode_list(value).context(error::SerdeJsonSnafu {})?;
318 Ok(column_metadatas)
319}
320
321pub async fn sync_follower_regions(
323 context: &DdlContext,
324 table_id: TableId,
325 results: &[RegionResponse],
326 region_routes: &[RegionRoute],
327 engine: &str,
328) -> Result<()> {
329 if engine != MITO_ENGINE && engine != METRIC_ENGINE {
330 info!(
331 "Skip submitting sync region requests for table_id: {}, engine: {}",
332 table_id, engine
333 );
334 return Ok(());
335 }
336
337 let results = results
338 .iter()
339 .map(|response| parse_manifest_infos_from_extensions(&response.extensions))
340 .collect::<Result<Vec<_>>>()?
341 .into_iter()
342 .flatten()
343 .collect::<HashMap<_, _>>();
344
345 let is_mito_engine = engine == MITO_ENGINE;
346
347 let followers = find_followers(region_routes);
348 if followers.is_empty() {
349 return Ok(());
350 }
351 let mut sync_region_tasks = Vec::with_capacity(followers.len());
352 for datanode in followers {
353 let requester = context.node_manager.datanode(&datanode).await;
354 let regions = find_follower_regions(region_routes, &datanode);
355 for region in regions {
356 let region_id = RegionId::new(table_id, region);
357 let manifest_info = if is_mito_engine {
358 let region_manifest_info =
359 results.get(®ion_id).context(error::UnexpectedSnafu {
360 err_msg: format!("No manifest info found for region {}", region_id),
361 })?;
362 ensure!(
363 region_manifest_info.is_mito(),
364 error::UnexpectedSnafu {
365 err_msg: format!("Region {} is not a mito region", region_id)
366 }
367 );
368 ManifestInfo::MitoManifestInfo(MitoManifestInfo {
369 data_manifest_version: region_manifest_info.data_manifest_version(),
370 })
371 } else {
372 let region_manifest_info =
373 results.get(®ion_id).context(error::UnexpectedSnafu {
374 err_msg: format!("No manifest info found for region {}", region_id),
375 })?;
376 ensure!(
377 region_manifest_info.is_metric(),
378 error::UnexpectedSnafu {
379 err_msg: format!("Region {} is not a metric region", region_id)
380 }
381 );
382 ManifestInfo::MetricManifestInfo(MetricManifestInfo {
383 data_manifest_version: region_manifest_info.data_manifest_version(),
384 metadata_manifest_version: region_manifest_info
385 .metadata_manifest_version()
386 .unwrap_or_default(),
387 })
388 };
389 let request = RegionRequest {
390 header: Some(RegionRequestHeader {
391 tracing_context: TracingContext::from_current_span().to_w3c(),
392 ..Default::default()
393 }),
394 body: Some(region_request::Body::Sync(SyncRequest {
395 region_id: region_id.as_u64(),
396 manifest_info: Some(manifest_info),
397 })),
398 };
399
400 let datanode = datanode.clone();
401 let requester = requester.clone();
402 sync_region_tasks.push(async move {
403 requester
404 .handle(request)
405 .await
406 .map_err(add_peer_context_if_needed(datanode))
407 });
408 }
409 }
410
411 if let Err(err) = join_all(sync_region_tasks)
414 .await
415 .into_iter()
416 .collect::<Result<Vec<_>>>()
417 {
418 error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
419 }
420 info!("Sync follower regions on datanodes, table_id: {}", table_id);
421
422 Ok(())
423}
424
425pub fn extract_column_metadatas(
427 results: &mut [RegionResponse],
428 key: &str,
429) -> Result<Option<Vec<ColumnMetadata>>> {
430 let mut schemas = results
431 .iter_mut()
432 .map(|r| r.extensions.remove(key))
433 .collect::<Vec<_>>();
434
435 if schemas.is_empty() {
436 warn!("extract_column_metadatas: no extension key `{key}` found in results");
437 return Ok(None);
438 }
439
440 let first_column_metadatas = schemas
443 .swap_remove(0)
444 .map(|first_bytes| ColumnMetadata::decode_list(&first_bytes).context(DecodeJsonSnafu))
445 .transpose()?;
446
447 for s in schemas {
448 let column_metadata = s
450 .map(|bytes| ColumnMetadata::decode_list(&bytes).context(DecodeJsonSnafu))
451 .transpose()?;
452 ensure!(
453 column_metadata == first_column_metadatas,
454 MetadataCorruptionSnafu {
455 err_msg: format!(
456 "The table column metadata schemas from datanodes are not the same. First: {:?}, Current: {:?}",
457 first_column_metadatas, column_metadata,
458 ),
459 }
460 );
461 }
462 Ok(first_column_metadatas)
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468
469 #[test]
470 fn test_get_catalog_and_schema() {
471 let test_catalog = "my_catalog";
472 let test_schema = "my_schema";
473 let path = region_storage_path(test_catalog, test_schema);
474 let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
475 assert_eq!(catalog, test_catalog);
476 assert_eq!(schema, test_schema);
477 }
478}