1use std::collections::HashMap;
16use std::fmt::Debug;
17
18use api::region::RegionResponse;
19use api::v1::region::sync_request::ManifestInfo;
20use api::v1::region::{
21 region_request, MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader,
22 SyncRequest,
23};
24use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
25use common_error::ext::BoxedError;
26use common_procedure::error::Error as ProcedureError;
27use common_telemetry::tracing_context::TracingContext;
28use common_telemetry::{error, info, warn};
29use common_wal::options::WalOptions;
30use futures::future::join_all;
31use snafu::{ensure, OptionExt, ResultExt};
32use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
33use store_api::region_engine::RegionManifestInfo;
34use store_api::storage::{RegionId, RegionNumber};
35use table::metadata::TableId;
36use table::table_reference::TableReference;
37
38use crate::ddl::{DdlContext, DetectingRegion};
39use crate::error::{
40 self, Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu,
41 UnsupportedSnafu,
42};
43use crate::key::datanode_table::DatanodeTableValue;
44use crate::key::table_name::TableNameKey;
45use crate::key::TableMetadataManagerRef;
46use crate::peer::Peer;
47use crate::rpc::ddl::CreateTableTask;
48use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
49
50pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
52 move |err| {
53 error!(err; "Failed to operate datanode, peer: {}", datanode);
54 if !err.is_retry_later() {
55 return Err::<(), BoxedError>(BoxedError::new(err))
56 .context(OperateDatanodeSnafu { peer: datanode })
57 .unwrap_err();
58 }
59 err
60 }
61}
62
63pub fn map_to_procedure_error(e: Error) -> ProcedureError {
68 match (e.is_retry_later(), e.need_clean_poisons()) {
69 (true, true) => ProcedureError::retry_later_and_clean_poisons(e),
70 (true, false) => ProcedureError::retry_later(e),
71 (false, true) => ProcedureError::external_and_clean_poisons(e),
72 (false, false) => ProcedureError::external(e),
73 }
74}
75
76#[inline]
77pub fn region_storage_path(catalog: &str, schema: &str) -> String {
78 format!("{}/{}", catalog, schema)
79}
80
81pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
83 let mut split = path.split('/');
84 Some((split.next()?.to_string(), split.next()?.to_string()))
85}
86
87pub async fn check_and_get_physical_table_id(
88 table_metadata_manager: &TableMetadataManagerRef,
89 tasks: &[CreateTableTask],
90) -> Result<TableId> {
91 let mut physical_table_name = None;
92 for task in tasks {
93 ensure!(
94 task.create_table.engine == METRIC_ENGINE,
95 UnsupportedSnafu {
96 operation: format!("create table with engine {}", task.create_table.engine)
97 }
98 );
99 let current_physical_table_name = task
100 .create_table
101 .table_options
102 .get(LOGICAL_TABLE_METADATA_KEY)
103 .context(UnsupportedSnafu {
104 operation: format!(
105 "create table without table options {}",
106 LOGICAL_TABLE_METADATA_KEY,
107 ),
108 })?;
109 let current_physical_table_name = TableNameKey::new(
110 &task.create_table.catalog_name,
111 &task.create_table.schema_name,
112 current_physical_table_name,
113 );
114
115 physical_table_name = match physical_table_name {
116 Some(name) => {
117 ensure!(
118 name == current_physical_table_name,
119 UnsupportedSnafu {
120 operation: format!(
121 "create table with different physical table name {} and {}",
122 name, current_physical_table_name
123 )
124 }
125 );
126 Some(name)
127 }
128 None => Some(current_physical_table_name),
129 };
130 }
131 let physical_table_name = physical_table_name.unwrap();
133 table_metadata_manager
134 .table_name_manager()
135 .get(physical_table_name)
136 .await?
137 .with_context(|| TableNotFoundSnafu {
138 table_name: TableReference::from(physical_table_name).to_string(),
139 })
140 .map(|table| table.table_id())
141}
142
143pub async fn get_physical_table_id(
144 table_metadata_manager: &TableMetadataManagerRef,
145 logical_table_name: TableNameKey<'_>,
146) -> Result<TableId> {
147 let logical_table_id = table_metadata_manager
148 .table_name_manager()
149 .get(logical_table_name)
150 .await?
151 .with_context(|| TableNotFoundSnafu {
152 table_name: TableReference::from(logical_table_name).to_string(),
153 })
154 .map(|table| table.table_id())?;
155
156 table_metadata_manager
157 .table_route_manager()
158 .get_physical_table_id(logical_table_id)
159 .await
160}
161
162pub fn convert_region_routes_to_detecting_regions(
164 region_routes: &[RegionRoute],
165) -> Vec<DetectingRegion> {
166 region_routes
167 .iter()
168 .flat_map(|route| {
169 route
170 .leader_peer
171 .as_ref()
172 .map(|peer| (peer.id, route.region.id))
173 })
174 .collect::<Vec<_>>()
175}
176
177pub fn parse_region_wal_options(
179 serialized_options: &HashMap<RegionNumber, String>,
180) -> Result<HashMap<RegionNumber, WalOptions>> {
181 let mut region_wal_options = HashMap::with_capacity(serialized_options.len());
182 for (region_number, wal_options) in serialized_options {
183 let wal_option = serde_json::from_str::<WalOptions>(wal_options)
184 .context(ParseWalOptionsSnafu { wal_options })?;
185 region_wal_options.insert(*region_number, wal_option);
186 }
187 Ok(region_wal_options)
188}
189
190pub fn extract_region_wal_options(
192 datanode_table_values: &Vec<DatanodeTableValue>,
193) -> Result<HashMap<RegionNumber, WalOptions>> {
194 let mut region_wal_options = HashMap::new();
195 for value in datanode_table_values {
196 let serialized_options = &value.region_info.region_wal_options;
197 let parsed_options = parse_region_wal_options(serialized_options)?;
198 region_wal_options.extend(parsed_options);
199 }
200 Ok(region_wal_options)
201}
202
203pub enum MultipleResults<T> {
211 Ok(Vec<T>),
212 PartialRetryable(Error),
213 PartialNonRetryable(Error),
214 AllRetryable(Error),
215 AllNonRetryable(Error),
216}
217
218pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
224 if results.is_empty() {
225 return MultipleResults::Ok(Vec::new());
226 }
227 let num_results = results.len();
228 let mut retryable_results = Vec::new();
229 let mut non_retryable_results = Vec::new();
230 let mut ok_results = Vec::new();
231
232 for result in results {
233 match result {
234 Ok(value) => ok_results.push(value),
235 Err(err) => {
236 if err.is_retry_later() {
237 retryable_results.push(err);
238 } else {
239 non_retryable_results.push(err);
240 }
241 }
242 }
243 }
244
245 common_telemetry::debug!(
246 "retryable_results: {}, non_retryable_results: {}, ok_results: {}",
247 retryable_results.len(),
248 non_retryable_results.len(),
249 ok_results.len()
250 );
251
252 if retryable_results.len() == num_results {
253 return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
254 } else if non_retryable_results.len() == num_results {
255 warn!("all non retryable results: {}", non_retryable_results.len());
256 for err in &non_retryable_results {
257 error!(err; "non retryable error");
258 }
259 return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
260 } else if ok_results.len() == num_results {
261 return MultipleResults::Ok(ok_results);
262 } else if !retryable_results.is_empty()
263 && !ok_results.is_empty()
264 && non_retryable_results.is_empty()
265 {
266 return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
267 }
268
269 warn!(
270 "partial non retryable results: {}, retryable results: {}, ok results: {}",
271 non_retryable_results.len(),
272 retryable_results.len(),
273 ok_results.len()
274 );
275 for err in &non_retryable_results {
276 error!(err; "non retryable error");
277 }
278 MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
280}
281
282pub fn parse_manifest_infos_from_extensions(
284 extensions: &HashMap<String, Vec<u8>>,
285) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
286 let data_manifest_version =
287 extensions
288 .get(MANIFEST_INFO_EXTENSION_KEY)
289 .context(error::UnexpectedSnafu {
290 err_msg: "manifest info extension not found",
291 })?;
292 let data_manifest_version =
293 RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
294 Ok(data_manifest_version)
295}
296
297pub async fn sync_follower_regions(
299 context: &DdlContext,
300 table_id: TableId,
301 results: Vec<RegionResponse>,
302 region_routes: &[RegionRoute],
303 engine: &str,
304) -> Result<()> {
305 if engine != MITO_ENGINE && engine != METRIC_ENGINE {
306 info!(
307 "Skip submitting sync region requests for table_id: {}, engine: {}",
308 table_id, engine
309 );
310 return Ok(());
311 }
312
313 let results = results
314 .into_iter()
315 .map(|response| parse_manifest_infos_from_extensions(&response.extensions))
316 .collect::<Result<Vec<_>>>()?
317 .into_iter()
318 .flatten()
319 .collect::<HashMap<_, _>>();
320
321 let is_mito_engine = engine == MITO_ENGINE;
322
323 let followers = find_followers(region_routes);
324 if followers.is_empty() {
325 return Ok(());
326 }
327 let mut sync_region_tasks = Vec::with_capacity(followers.len());
328 for datanode in followers {
329 let requester = context.node_manager.datanode(&datanode).await;
330 let regions = find_follower_regions(region_routes, &datanode);
331 for region in regions {
332 let region_id = RegionId::new(table_id, region);
333 let manifest_info = if is_mito_engine {
334 let region_manifest_info =
335 results.get(®ion_id).context(error::UnexpectedSnafu {
336 err_msg: format!("No manifest info found for region {}", region_id),
337 })?;
338 ensure!(
339 region_manifest_info.is_mito(),
340 error::UnexpectedSnafu {
341 err_msg: format!("Region {} is not a mito region", region_id)
342 }
343 );
344 ManifestInfo::MitoManifestInfo(MitoManifestInfo {
345 data_manifest_version: region_manifest_info.data_manifest_version(),
346 })
347 } else {
348 let region_manifest_info =
349 results.get(®ion_id).context(error::UnexpectedSnafu {
350 err_msg: format!("No manifest info found for region {}", region_id),
351 })?;
352 ensure!(
353 region_manifest_info.is_metric(),
354 error::UnexpectedSnafu {
355 err_msg: format!("Region {} is not a metric region", region_id)
356 }
357 );
358 ManifestInfo::MetricManifestInfo(MetricManifestInfo {
359 data_manifest_version: region_manifest_info.data_manifest_version(),
360 metadata_manifest_version: region_manifest_info
361 .metadata_manifest_version()
362 .unwrap_or_default(),
363 })
364 };
365 let request = RegionRequest {
366 header: Some(RegionRequestHeader {
367 tracing_context: TracingContext::from_current_span().to_w3c(),
368 ..Default::default()
369 }),
370 body: Some(region_request::Body::Sync(SyncRequest {
371 region_id: region_id.as_u64(),
372 manifest_info: Some(manifest_info),
373 })),
374 };
375
376 let datanode = datanode.clone();
377 let requester = requester.clone();
378 sync_region_tasks.push(async move {
379 requester
380 .handle(request)
381 .await
382 .map_err(add_peer_context_if_needed(datanode))
383 });
384 }
385 }
386
387 if let Err(err) = join_all(sync_region_tasks)
390 .await
391 .into_iter()
392 .collect::<Result<Vec<_>>>()
393 {
394 error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
395 }
396 info!("Sync follower regions on datanodes, table_id: {}", table_id);
397
398 Ok(())
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404
405 #[test]
406 fn test_get_catalog_and_schema() {
407 let test_catalog = "my_catalog";
408 let test_schema = "my_schema";
409 let path = region_storage_path(test_catalog, test_schema);
410 let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
411 assert_eq!(catalog, test_catalog);
412 assert_eq!(schema, test_schema);
413 }
414}