1use std::collections::HashMap;
16use std::fmt::Debug;
17
18use api::region::RegionResponse;
19use api::v1::region::sync_request::ManifestInfo;
20use api::v1::region::{
21 region_request, MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader,
22 SyncRequest,
23};
24use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
25use common_error::ext::BoxedError;
26use common_procedure::error::Error as ProcedureError;
27use common_telemetry::tracing_context::TracingContext;
28use common_telemetry::{error, info, warn};
29use common_wal::options::WalOptions;
30use futures::future::join_all;
31use snafu::{ensure, OptionExt, ResultExt};
32use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
33use store_api::region_engine::RegionManifestInfo;
34use store_api::storage::{RegionId, RegionNumber};
35use table::metadata::TableId;
36use table::table_reference::TableReference;
37
38use crate::ddl::{DdlContext, DetectingRegion};
39use crate::error::{
40 self, Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu,
41 UnsupportedSnafu,
42};
43use crate::key::datanode_table::DatanodeTableValue;
44use crate::key::table_name::TableNameKey;
45use crate::key::TableMetadataManagerRef;
46use crate::peer::Peer;
47use crate::rpc::ddl::CreateTableTask;
48use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
49
50pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
52 move |err| {
53 error!(err; "Failed to operate datanode, peer: {}", datanode);
54 if !err.is_retry_later() {
55 return Err::<(), BoxedError>(BoxedError::new(err))
56 .context(OperateDatanodeSnafu { peer: datanode })
57 .unwrap_err();
58 }
59 err
60 }
61}
62
63pub fn handle_retry_error(e: Error) -> ProcedureError {
64 if e.is_retry_later() {
65 ProcedureError::retry_later(e)
66 } else {
67 ProcedureError::external(e)
68 }
69}
70
71#[inline]
72pub fn region_storage_path(catalog: &str, schema: &str) -> String {
73 format!("{}/{}", catalog, schema)
74}
75
76pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
78 let mut split = path.split('/');
79 Some((split.next()?.to_string(), split.next()?.to_string()))
80}
81
82pub async fn check_and_get_physical_table_id(
83 table_metadata_manager: &TableMetadataManagerRef,
84 tasks: &[CreateTableTask],
85) -> Result<TableId> {
86 let mut physical_table_name = None;
87 for task in tasks {
88 ensure!(
89 task.create_table.engine == METRIC_ENGINE,
90 UnsupportedSnafu {
91 operation: format!("create table with engine {}", task.create_table.engine)
92 }
93 );
94 let current_physical_table_name = task
95 .create_table
96 .table_options
97 .get(LOGICAL_TABLE_METADATA_KEY)
98 .context(UnsupportedSnafu {
99 operation: format!(
100 "create table without table options {}",
101 LOGICAL_TABLE_METADATA_KEY,
102 ),
103 })?;
104 let current_physical_table_name = TableNameKey::new(
105 &task.create_table.catalog_name,
106 &task.create_table.schema_name,
107 current_physical_table_name,
108 );
109
110 physical_table_name = match physical_table_name {
111 Some(name) => {
112 ensure!(
113 name == current_physical_table_name,
114 UnsupportedSnafu {
115 operation: format!(
116 "create table with different physical table name {} and {}",
117 name, current_physical_table_name
118 )
119 }
120 );
121 Some(name)
122 }
123 None => Some(current_physical_table_name),
124 };
125 }
126 let physical_table_name = physical_table_name.unwrap();
128 table_metadata_manager
129 .table_name_manager()
130 .get(physical_table_name)
131 .await?
132 .with_context(|| TableNotFoundSnafu {
133 table_name: TableReference::from(physical_table_name).to_string(),
134 })
135 .map(|table| table.table_id())
136}
137
138pub async fn get_physical_table_id(
139 table_metadata_manager: &TableMetadataManagerRef,
140 logical_table_name: TableNameKey<'_>,
141) -> Result<TableId> {
142 let logical_table_id = table_metadata_manager
143 .table_name_manager()
144 .get(logical_table_name)
145 .await?
146 .with_context(|| TableNotFoundSnafu {
147 table_name: TableReference::from(logical_table_name).to_string(),
148 })
149 .map(|table| table.table_id())?;
150
151 table_metadata_manager
152 .table_route_manager()
153 .get_physical_table_id(logical_table_id)
154 .await
155}
156
157pub fn convert_region_routes_to_detecting_regions(
159 region_routes: &[RegionRoute],
160) -> Vec<DetectingRegion> {
161 region_routes
162 .iter()
163 .flat_map(|route| {
164 route
165 .leader_peer
166 .as_ref()
167 .map(|peer| (peer.id, route.region.id))
168 })
169 .collect::<Vec<_>>()
170}
171
172pub fn parse_region_wal_options(
174 serialized_options: &HashMap<RegionNumber, String>,
175) -> Result<HashMap<RegionNumber, WalOptions>> {
176 let mut region_wal_options = HashMap::with_capacity(serialized_options.len());
177 for (region_number, wal_options) in serialized_options {
178 let wal_option = serde_json::from_str::<WalOptions>(wal_options)
179 .context(ParseWalOptionsSnafu { wal_options })?;
180 region_wal_options.insert(*region_number, wal_option);
181 }
182 Ok(region_wal_options)
183}
184
185pub fn extract_region_wal_options(
187 datanode_table_values: &Vec<DatanodeTableValue>,
188) -> Result<HashMap<RegionNumber, WalOptions>> {
189 let mut region_wal_options = HashMap::new();
190 for value in datanode_table_values {
191 let serialized_options = &value.region_info.region_wal_options;
192 let parsed_options = parse_region_wal_options(serialized_options)?;
193 region_wal_options.extend(parsed_options);
194 }
195 Ok(region_wal_options)
196}
197
198pub enum MultipleResults<T> {
206 Ok(Vec<T>),
207 PartialRetryable(Error),
208 PartialNonRetryable(Error),
209 AllRetryable(Error),
210 AllNonRetryable(Error),
211}
212
213pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
219 if results.is_empty() {
220 return MultipleResults::Ok(Vec::new());
221 }
222 let num_results = results.len();
223 let mut retryable_results = Vec::new();
224 let mut non_retryable_results = Vec::new();
225 let mut ok_results = Vec::new();
226
227 for result in results {
228 match result {
229 Ok(value) => ok_results.push(value),
230 Err(err) => {
231 if err.is_retry_later() {
232 retryable_results.push(err);
233 } else {
234 non_retryable_results.push(err);
235 }
236 }
237 }
238 }
239
240 common_telemetry::debug!(
241 "retryable_results: {}, non_retryable_results: {}, ok_results: {}",
242 retryable_results.len(),
243 non_retryable_results.len(),
244 ok_results.len()
245 );
246
247 if retryable_results.len() == num_results {
248 return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
249 } else if non_retryable_results.len() == num_results {
250 warn!("all non retryable results: {}", non_retryable_results.len());
251 for err in &non_retryable_results {
252 error!(err; "non retryable error");
253 }
254 return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
255 } else if ok_results.len() == num_results {
256 return MultipleResults::Ok(ok_results);
257 } else if !retryable_results.is_empty()
258 && !ok_results.is_empty()
259 && non_retryable_results.is_empty()
260 {
261 return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
262 }
263
264 warn!(
265 "partial non retryable results: {}, retryable results: {}, ok results: {}",
266 non_retryable_results.len(),
267 retryable_results.len(),
268 ok_results.len()
269 );
270 for err in &non_retryable_results {
271 error!(err; "non retryable error");
272 }
273 MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
275}
276
277pub fn parse_manifest_infos_from_extensions(
279 extensions: &HashMap<String, Vec<u8>>,
280) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
281 let data_manifest_version =
282 extensions
283 .get(MANIFEST_INFO_EXTENSION_KEY)
284 .context(error::UnexpectedSnafu {
285 err_msg: "manifest info extension not found",
286 })?;
287 let data_manifest_version =
288 RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
289 Ok(data_manifest_version)
290}
291
292pub async fn sync_follower_regions(
294 context: &DdlContext,
295 table_id: TableId,
296 results: Vec<RegionResponse>,
297 region_routes: &[RegionRoute],
298 engine: &str,
299) -> Result<()> {
300 if engine != MITO_ENGINE && engine != METRIC_ENGINE {
301 info!(
302 "Skip submitting sync region requests for table_id: {}, engine: {}",
303 table_id, engine
304 );
305 return Ok(());
306 }
307
308 let results = results
309 .into_iter()
310 .map(|response| parse_manifest_infos_from_extensions(&response.extensions))
311 .collect::<Result<Vec<_>>>()?
312 .into_iter()
313 .flatten()
314 .collect::<HashMap<_, _>>();
315
316 let is_mito_engine = engine == MITO_ENGINE;
317
318 let followers = find_followers(region_routes);
319 if followers.is_empty() {
320 return Ok(());
321 }
322 let mut sync_region_tasks = Vec::with_capacity(followers.len());
323 for datanode in followers {
324 let requester = context.node_manager.datanode(&datanode).await;
325 let regions = find_follower_regions(region_routes, &datanode);
326 for region in regions {
327 let region_id = RegionId::new(table_id, region);
328 let manifest_info = if is_mito_engine {
329 let region_manifest_info =
330 results.get(®ion_id).context(error::UnexpectedSnafu {
331 err_msg: format!("No manifest info found for region {}", region_id),
332 })?;
333 ensure!(
334 region_manifest_info.is_mito(),
335 error::UnexpectedSnafu {
336 err_msg: format!("Region {} is not a mito region", region_id)
337 }
338 );
339 ManifestInfo::MitoManifestInfo(MitoManifestInfo {
340 data_manifest_version: region_manifest_info.data_manifest_version(),
341 })
342 } else {
343 let region_manifest_info =
344 results.get(®ion_id).context(error::UnexpectedSnafu {
345 err_msg: format!("No manifest info found for region {}", region_id),
346 })?;
347 ensure!(
348 region_manifest_info.is_metric(),
349 error::UnexpectedSnafu {
350 err_msg: format!("Region {} is not a metric region", region_id)
351 }
352 );
353 ManifestInfo::MetricManifestInfo(MetricManifestInfo {
354 data_manifest_version: region_manifest_info.data_manifest_version(),
355 metadata_manifest_version: region_manifest_info
356 .metadata_manifest_version()
357 .unwrap_or_default(),
358 })
359 };
360 let request = RegionRequest {
361 header: Some(RegionRequestHeader {
362 tracing_context: TracingContext::from_current_span().to_w3c(),
363 ..Default::default()
364 }),
365 body: Some(region_request::Body::Sync(SyncRequest {
366 region_id: region_id.as_u64(),
367 manifest_info: Some(manifest_info),
368 })),
369 };
370
371 let datanode = datanode.clone();
372 let requester = requester.clone();
373 sync_region_tasks.push(async move {
374 requester
375 .handle(request)
376 .await
377 .map_err(add_peer_context_if_needed(datanode))
378 });
379 }
380 }
381
382 if let Err(err) = join_all(sync_region_tasks)
385 .await
386 .into_iter()
387 .collect::<Result<Vec<_>>>()
388 {
389 error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
390 }
391 info!("Sync follower regions on datanodes, table_id: {}", table_id);
392
393 Ok(())
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399
400 #[test]
401 fn test_get_catalog_and_schema() {
402 let test_catalog = "my_catalog";
403 let test_schema = "my_schema";
404 let path = region_storage_path(test_catalog, test_schema);
405 let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
406 assert_eq!(catalog, test_catalog);
407 assert_eq!(schema, test_schema);
408 }
409}