1use std::collections::HashMap;
16use std::fmt::Debug;
17
18use api::region::RegionResponse;
19use api::v1::region::sync_request::ManifestInfo;
20use api::v1::region::{
21 region_request, MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader,
22 SyncRequest,
23};
24use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
25use common_error::ext::BoxedError;
26use common_procedure::error::Error as ProcedureError;
27use common_telemetry::tracing_context::TracingContext;
28use common_telemetry::{error, info, warn};
29use common_wal::options::WalOptions;
30use futures::future::join_all;
31use snafu::{ensure, OptionExt, ResultExt};
32use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
33use store_api::region_engine::RegionManifestInfo;
34use store_api::storage::{RegionId, RegionNumber};
35use table::metadata::TableId;
36use table::table_reference::TableReference;
37
38use crate::ddl::{DdlContext, DetectingRegion};
39use crate::error::{
40 self, Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu,
41 UnsupportedSnafu,
42};
43use crate::key::datanode_table::DatanodeTableValue;
44use crate::key::table_name::TableNameKey;
45use crate::key::table_route::TableRouteValue;
46use crate::key::{TableMetadataManager, TableMetadataManagerRef};
47use crate::peer::Peer;
48use crate::rpc::ddl::CreateTableTask;
49use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
50
51pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
53 move |err| {
54 error!(err; "Failed to operate datanode, peer: {}", datanode);
55 if !err.is_retry_later() {
56 return Err::<(), BoxedError>(BoxedError::new(err))
57 .context(OperateDatanodeSnafu { peer: datanode })
58 .unwrap_err();
59 }
60 err
61 }
62}
63
64pub fn map_to_procedure_error(e: Error) -> ProcedureError {
69 match (e.is_retry_later(), e.need_clean_poisons()) {
70 (true, true) => ProcedureError::retry_later_and_clean_poisons(e),
71 (true, false) => ProcedureError::retry_later(e),
72 (false, true) => ProcedureError::external_and_clean_poisons(e),
73 (false, false) => ProcedureError::external(e),
74 }
75}
76
77#[inline]
78pub fn region_storage_path(catalog: &str, schema: &str) -> String {
79 format!("{}/{}", catalog, schema)
80}
81
82pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
84 let mut split = path.split('/');
85 Some((split.next()?.to_string(), split.next()?.to_string()))
86}
87
88pub async fn check_and_get_physical_table_id(
89 table_metadata_manager: &TableMetadataManagerRef,
90 tasks: &[CreateTableTask],
91) -> Result<TableId> {
92 let mut physical_table_name = None;
93 for task in tasks {
94 ensure!(
95 task.create_table.engine == METRIC_ENGINE,
96 UnsupportedSnafu {
97 operation: format!("create table with engine {}", task.create_table.engine)
98 }
99 );
100 let current_physical_table_name = task
101 .create_table
102 .table_options
103 .get(LOGICAL_TABLE_METADATA_KEY)
104 .context(UnsupportedSnafu {
105 operation: format!(
106 "create table without table options {}",
107 LOGICAL_TABLE_METADATA_KEY,
108 ),
109 })?;
110 let current_physical_table_name = TableNameKey::new(
111 &task.create_table.catalog_name,
112 &task.create_table.schema_name,
113 current_physical_table_name,
114 );
115
116 physical_table_name = match physical_table_name {
117 Some(name) => {
118 ensure!(
119 name == current_physical_table_name,
120 UnsupportedSnafu {
121 operation: format!(
122 "create table with different physical table name {} and {}",
123 name, current_physical_table_name
124 )
125 }
126 );
127 Some(name)
128 }
129 None => Some(current_physical_table_name),
130 };
131 }
132 let physical_table_name = physical_table_name.unwrap();
134 table_metadata_manager
135 .table_name_manager()
136 .get(physical_table_name)
137 .await?
138 .with_context(|| TableNotFoundSnafu {
139 table_name: TableReference::from(physical_table_name).to_string(),
140 })
141 .map(|table| table.table_id())
142}
143
144pub async fn get_physical_table_id(
145 table_metadata_manager: &TableMetadataManagerRef,
146 logical_table_name: TableNameKey<'_>,
147) -> Result<TableId> {
148 let logical_table_id = table_metadata_manager
149 .table_name_manager()
150 .get(logical_table_name)
151 .await?
152 .with_context(|| TableNotFoundSnafu {
153 table_name: TableReference::from(logical_table_name).to_string(),
154 })
155 .map(|table| table.table_id())?;
156
157 table_metadata_manager
158 .table_route_manager()
159 .get_physical_table_id(logical_table_id)
160 .await
161}
162
163pub fn convert_region_routes_to_detecting_regions(
165 region_routes: &[RegionRoute],
166) -> Vec<DetectingRegion> {
167 region_routes
168 .iter()
169 .flat_map(|route| {
170 route
171 .leader_peer
172 .as_ref()
173 .map(|peer| (peer.id, route.region.id))
174 })
175 .collect::<Vec<_>>()
176}
177
178pub fn parse_region_wal_options(
180 serialized_options: &HashMap<RegionNumber, String>,
181) -> Result<HashMap<RegionNumber, WalOptions>> {
182 let mut region_wal_options = HashMap::with_capacity(serialized_options.len());
183 for (region_number, wal_options) in serialized_options {
184 let wal_option = serde_json::from_str::<WalOptions>(wal_options)
185 .context(ParseWalOptionsSnafu { wal_options })?;
186 region_wal_options.insert(*region_number, wal_option);
187 }
188 Ok(region_wal_options)
189}
190
191pub async fn get_region_wal_options(
193 table_metadata_manager: &TableMetadataManager,
194 table_route_value: &TableRouteValue,
195 physical_table_id: TableId,
196) -> Result<HashMap<RegionNumber, WalOptions>> {
197 let region_wal_options =
198 if let TableRouteValue::Physical(table_route_value) = &table_route_value {
199 let datanode_table_values = table_metadata_manager
200 .datanode_table_manager()
201 .regions(physical_table_id, table_route_value)
202 .await?;
203 extract_region_wal_options(&datanode_table_values)?
204 } else {
205 HashMap::new()
206 };
207 Ok(region_wal_options)
208}
209
210pub fn extract_region_wal_options(
212 datanode_table_values: &Vec<DatanodeTableValue>,
213) -> Result<HashMap<RegionNumber, WalOptions>> {
214 let mut region_wal_options = HashMap::new();
215 for value in datanode_table_values {
216 let serialized_options = &value.region_info.region_wal_options;
217 let parsed_options = parse_region_wal_options(serialized_options)?;
218 region_wal_options.extend(parsed_options);
219 }
220 Ok(region_wal_options)
221}
222
223pub enum MultipleResults<T> {
231 Ok(Vec<T>),
232 PartialRetryable(Error),
233 PartialNonRetryable(Error),
234 AllRetryable(Error),
235 AllNonRetryable(Error),
236}
237
238pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
244 if results.is_empty() {
245 return MultipleResults::Ok(Vec::new());
246 }
247 let num_results = results.len();
248 let mut retryable_results = Vec::new();
249 let mut non_retryable_results = Vec::new();
250 let mut ok_results = Vec::new();
251
252 for result in results {
253 match result {
254 Ok(value) => ok_results.push(value),
255 Err(err) => {
256 if err.is_retry_later() {
257 retryable_results.push(err);
258 } else {
259 non_retryable_results.push(err);
260 }
261 }
262 }
263 }
264
265 common_telemetry::debug!(
266 "retryable_results: {}, non_retryable_results: {}, ok_results: {}",
267 retryable_results.len(),
268 non_retryable_results.len(),
269 ok_results.len()
270 );
271
272 if retryable_results.len() == num_results {
273 return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
274 } else if non_retryable_results.len() == num_results {
275 warn!("all non retryable results: {}", non_retryable_results.len());
276 for err in &non_retryable_results {
277 error!(err; "non retryable error");
278 }
279 return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
280 } else if ok_results.len() == num_results {
281 return MultipleResults::Ok(ok_results);
282 } else if !retryable_results.is_empty()
283 && !ok_results.is_empty()
284 && non_retryable_results.is_empty()
285 {
286 return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
287 }
288
289 warn!(
290 "partial non retryable results: {}, retryable results: {}, ok results: {}",
291 non_retryable_results.len(),
292 retryable_results.len(),
293 ok_results.len()
294 );
295 for err in &non_retryable_results {
296 error!(err; "non retryable error");
297 }
298 MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
300}
301
302pub fn parse_manifest_infos_from_extensions(
304 extensions: &HashMap<String, Vec<u8>>,
305) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
306 let data_manifest_version =
307 extensions
308 .get(MANIFEST_INFO_EXTENSION_KEY)
309 .context(error::UnexpectedSnafu {
310 err_msg: "manifest info extension not found",
311 })?;
312 let data_manifest_version =
313 RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
314 Ok(data_manifest_version)
315}
316
317pub async fn sync_follower_regions(
319 context: &DdlContext,
320 table_id: TableId,
321 results: Vec<RegionResponse>,
322 region_routes: &[RegionRoute],
323 engine: &str,
324) -> Result<()> {
325 if engine != MITO_ENGINE && engine != METRIC_ENGINE {
326 info!(
327 "Skip submitting sync region requests for table_id: {}, engine: {}",
328 table_id, engine
329 );
330 return Ok(());
331 }
332
333 let results = results
334 .into_iter()
335 .map(|response| parse_manifest_infos_from_extensions(&response.extensions))
336 .collect::<Result<Vec<_>>>()?
337 .into_iter()
338 .flatten()
339 .collect::<HashMap<_, _>>();
340
341 let is_mito_engine = engine == MITO_ENGINE;
342
343 let followers = find_followers(region_routes);
344 if followers.is_empty() {
345 return Ok(());
346 }
347 let mut sync_region_tasks = Vec::with_capacity(followers.len());
348 for datanode in followers {
349 let requester = context.node_manager.datanode(&datanode).await;
350 let regions = find_follower_regions(region_routes, &datanode);
351 for region in regions {
352 let region_id = RegionId::new(table_id, region);
353 let manifest_info = if is_mito_engine {
354 let region_manifest_info =
355 results.get(®ion_id).context(error::UnexpectedSnafu {
356 err_msg: format!("No manifest info found for region {}", region_id),
357 })?;
358 ensure!(
359 region_manifest_info.is_mito(),
360 error::UnexpectedSnafu {
361 err_msg: format!("Region {} is not a mito region", region_id)
362 }
363 );
364 ManifestInfo::MitoManifestInfo(MitoManifestInfo {
365 data_manifest_version: region_manifest_info.data_manifest_version(),
366 })
367 } else {
368 let region_manifest_info =
369 results.get(®ion_id).context(error::UnexpectedSnafu {
370 err_msg: format!("No manifest info found for region {}", region_id),
371 })?;
372 ensure!(
373 region_manifest_info.is_metric(),
374 error::UnexpectedSnafu {
375 err_msg: format!("Region {} is not a metric region", region_id)
376 }
377 );
378 ManifestInfo::MetricManifestInfo(MetricManifestInfo {
379 data_manifest_version: region_manifest_info.data_manifest_version(),
380 metadata_manifest_version: region_manifest_info
381 .metadata_manifest_version()
382 .unwrap_or_default(),
383 })
384 };
385 let request = RegionRequest {
386 header: Some(RegionRequestHeader {
387 tracing_context: TracingContext::from_current_span().to_w3c(),
388 ..Default::default()
389 }),
390 body: Some(region_request::Body::Sync(SyncRequest {
391 region_id: region_id.as_u64(),
392 manifest_info: Some(manifest_info),
393 })),
394 };
395
396 let datanode = datanode.clone();
397 let requester = requester.clone();
398 sync_region_tasks.push(async move {
399 requester
400 .handle(request)
401 .await
402 .map_err(add_peer_context_if_needed(datanode))
403 });
404 }
405 }
406
407 if let Err(err) = join_all(sync_region_tasks)
410 .await
411 .into_iter()
412 .collect::<Result<Vec<_>>>()
413 {
414 error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
415 }
416 info!("Sync follower regions on datanodes, table_id: {}", table_id);
417
418 Ok(())
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424
425 #[test]
426 fn test_get_catalog_and_schema() {
427 let test_catalog = "my_catalog";
428 let test_schema = "my_schema";
429 let path = region_storage_path(test_catalog, test_schema);
430 let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
431 assert_eq!(catalog, test_catalog);
432 assert_eq!(schema, test_schema);
433 }
434}