common_meta/ddl/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Debug;
17
18use api::region::RegionResponse;
19use api::v1::region::sync_request::ManifestInfo;
20use api::v1::region::{
21    region_request, MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader,
22    SyncRequest,
23};
24use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
25use common_error::ext::BoxedError;
26use common_procedure::error::Error as ProcedureError;
27use common_telemetry::tracing_context::TracingContext;
28use common_telemetry::{error, info, warn};
29use common_wal::options::WalOptions;
30use futures::future::join_all;
31use snafu::{ensure, OptionExt, ResultExt};
32use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
33use store_api::region_engine::RegionManifestInfo;
34use store_api::storage::{RegionId, RegionNumber};
35use table::metadata::TableId;
36use table::table_reference::TableReference;
37
38use crate::ddl::{DdlContext, DetectingRegion};
39use crate::error::{
40    self, Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu,
41    UnsupportedSnafu,
42};
43use crate::key::datanode_table::DatanodeTableValue;
44use crate::key::table_name::TableNameKey;
45use crate::key::table_route::TableRouteValue;
46use crate::key::{TableMetadataManager, TableMetadataManagerRef};
47use crate::peer::Peer;
48use crate::rpc::ddl::CreateTableTask;
49use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
50
51/// Adds [Peer] context if the error is unretryable.
52pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
53    move |err| {
54        error!(err; "Failed to operate datanode, peer: {}", datanode);
55        if !err.is_retry_later() {
56            return Err::<(), BoxedError>(BoxedError::new(err))
57                .context(OperateDatanodeSnafu { peer: datanode })
58                .unwrap_err();
59        }
60        err
61    }
62}
63
64/// Maps the error to the corresponding procedure error.
65///
66/// This function determines whether the error should be retried and if poison cleanup is needed,
67/// then maps it to the appropriate procedure error variant.
68pub fn map_to_procedure_error(e: Error) -> ProcedureError {
69    match (e.is_retry_later(), e.need_clean_poisons()) {
70        (true, true) => ProcedureError::retry_later_and_clean_poisons(e),
71        (true, false) => ProcedureError::retry_later(e),
72        (false, true) => ProcedureError::external_and_clean_poisons(e),
73        (false, false) => ProcedureError::external(e),
74    }
75}
76
77#[inline]
78pub fn region_storage_path(catalog: &str, schema: &str) -> String {
79    format!("{}/{}", catalog, schema)
80}
81
82/// Extracts catalog and schema from the path that created by [region_storage_path].
83pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
84    let mut split = path.split('/');
85    Some((split.next()?.to_string(), split.next()?.to_string()))
86}
87
88pub async fn check_and_get_physical_table_id(
89    table_metadata_manager: &TableMetadataManagerRef,
90    tasks: &[CreateTableTask],
91) -> Result<TableId> {
92    let mut physical_table_name = None;
93    for task in tasks {
94        ensure!(
95            task.create_table.engine == METRIC_ENGINE,
96            UnsupportedSnafu {
97                operation: format!("create table with engine {}", task.create_table.engine)
98            }
99        );
100        let current_physical_table_name = task
101            .create_table
102            .table_options
103            .get(LOGICAL_TABLE_METADATA_KEY)
104            .context(UnsupportedSnafu {
105                operation: format!(
106                    "create table without table options {}",
107                    LOGICAL_TABLE_METADATA_KEY,
108                ),
109            })?;
110        let current_physical_table_name = TableNameKey::new(
111            &task.create_table.catalog_name,
112            &task.create_table.schema_name,
113            current_physical_table_name,
114        );
115
116        physical_table_name = match physical_table_name {
117            Some(name) => {
118                ensure!(
119                    name == current_physical_table_name,
120                    UnsupportedSnafu {
121                        operation: format!(
122                            "create table with different physical table name {} and {}",
123                            name, current_physical_table_name
124                        )
125                    }
126                );
127                Some(name)
128            }
129            None => Some(current_physical_table_name),
130        };
131    }
132    // Safety: `physical_table_name` is `Some` here
133    let physical_table_name = physical_table_name.unwrap();
134    table_metadata_manager
135        .table_name_manager()
136        .get(physical_table_name)
137        .await?
138        .with_context(|| TableNotFoundSnafu {
139            table_name: TableReference::from(physical_table_name).to_string(),
140        })
141        .map(|table| table.table_id())
142}
143
144pub async fn get_physical_table_id(
145    table_metadata_manager: &TableMetadataManagerRef,
146    logical_table_name: TableNameKey<'_>,
147) -> Result<TableId> {
148    let logical_table_id = table_metadata_manager
149        .table_name_manager()
150        .get(logical_table_name)
151        .await?
152        .with_context(|| TableNotFoundSnafu {
153            table_name: TableReference::from(logical_table_name).to_string(),
154        })
155        .map(|table| table.table_id())?;
156
157    table_metadata_manager
158        .table_route_manager()
159        .get_physical_table_id(logical_table_id)
160        .await
161}
162
163/// Converts a list of [`RegionRoute`] to a list of [`DetectingRegion`].
164pub fn convert_region_routes_to_detecting_regions(
165    region_routes: &[RegionRoute],
166) -> Vec<DetectingRegion> {
167    region_routes
168        .iter()
169        .flat_map(|route| {
170            route
171                .leader_peer
172                .as_ref()
173                .map(|peer| (peer.id, route.region.id))
174        })
175        .collect::<Vec<_>>()
176}
177
178/// Parses [WalOptions] from serialized strings in hashmap.
179pub fn parse_region_wal_options(
180    serialized_options: &HashMap<RegionNumber, String>,
181) -> Result<HashMap<RegionNumber, WalOptions>> {
182    let mut region_wal_options = HashMap::with_capacity(serialized_options.len());
183    for (region_number, wal_options) in serialized_options {
184        let wal_option = serde_json::from_str::<WalOptions>(wal_options)
185            .context(ParseWalOptionsSnafu { wal_options })?;
186        region_wal_options.insert(*region_number, wal_option);
187    }
188    Ok(region_wal_options)
189}
190
191/// Gets the wal options for a table.
192pub async fn get_region_wal_options(
193    table_metadata_manager: &TableMetadataManager,
194    table_route_value: &TableRouteValue,
195    physical_table_id: TableId,
196) -> Result<HashMap<RegionNumber, WalOptions>> {
197    let region_wal_options =
198        if let TableRouteValue::Physical(table_route_value) = &table_route_value {
199            let datanode_table_values = table_metadata_manager
200                .datanode_table_manager()
201                .regions(physical_table_id, table_route_value)
202                .await?;
203            extract_region_wal_options(&datanode_table_values)?
204        } else {
205            HashMap::new()
206        };
207    Ok(region_wal_options)
208}
209
210/// Extracts region wal options from [DatanodeTableValue]s.
211pub fn extract_region_wal_options(
212    datanode_table_values: &Vec<DatanodeTableValue>,
213) -> Result<HashMap<RegionNumber, WalOptions>> {
214    let mut region_wal_options = HashMap::new();
215    for value in datanode_table_values {
216        let serialized_options = &value.region_info.region_wal_options;
217        let parsed_options = parse_region_wal_options(serialized_options)?;
218        region_wal_options.extend(parsed_options);
219    }
220    Ok(region_wal_options)
221}
222
223/// The result of multiple operations.
224///
225/// - Ok: all operations are successful.
226/// - PartialRetryable: if any operation is retryable and without non retryable error, the result is retryable.
227/// - PartialNonRetryable: if any operation is non retryable, the result is non retryable.
228/// - AllRetryable: all operations are retryable.
229/// - AllNonRetryable: all operations are not retryable.
230pub enum MultipleResults<T> {
231    Ok(Vec<T>),
232    PartialRetryable(Error),
233    PartialNonRetryable(Error),
234    AllRetryable(Error),
235    AllNonRetryable(Error),
236}
237
238/// Handles the results of alter region requests.
239///
240/// For partial success, we need to check if the errors are retryable.
241/// If all the errors are retryable, we return a retryable error.
242/// Otherwise, we return the first error.
243pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
244    if results.is_empty() {
245        return MultipleResults::Ok(Vec::new());
246    }
247    let num_results = results.len();
248    let mut retryable_results = Vec::new();
249    let mut non_retryable_results = Vec::new();
250    let mut ok_results = Vec::new();
251
252    for result in results {
253        match result {
254            Ok(value) => ok_results.push(value),
255            Err(err) => {
256                if err.is_retry_later() {
257                    retryable_results.push(err);
258                } else {
259                    non_retryable_results.push(err);
260                }
261            }
262        }
263    }
264
265    common_telemetry::debug!(
266        "retryable_results: {}, non_retryable_results: {}, ok_results: {}",
267        retryable_results.len(),
268        non_retryable_results.len(),
269        ok_results.len()
270    );
271
272    if retryable_results.len() == num_results {
273        return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
274    } else if non_retryable_results.len() == num_results {
275        warn!("all non retryable results: {}", non_retryable_results.len());
276        for err in &non_retryable_results {
277            error!(err; "non retryable error");
278        }
279        return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
280    } else if ok_results.len() == num_results {
281        return MultipleResults::Ok(ok_results);
282    } else if !retryable_results.is_empty()
283        && !ok_results.is_empty()
284        && non_retryable_results.is_empty()
285    {
286        return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
287    }
288
289    warn!(
290        "partial non retryable results: {}, retryable results: {}, ok results: {}",
291        non_retryable_results.len(),
292        retryable_results.len(),
293        ok_results.len()
294    );
295    for err in &non_retryable_results {
296        error!(err; "non retryable error");
297    }
298    // non_retryable_results.len() > 0
299    MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
300}
301
302/// Parses manifest infos from extensions.
303pub fn parse_manifest_infos_from_extensions(
304    extensions: &HashMap<String, Vec<u8>>,
305) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
306    let data_manifest_version =
307        extensions
308            .get(MANIFEST_INFO_EXTENSION_KEY)
309            .context(error::UnexpectedSnafu {
310                err_msg: "manifest info extension not found",
311            })?;
312    let data_manifest_version =
313        RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
314    Ok(data_manifest_version)
315}
316
317/// Sync follower regions on datanodes.
318pub async fn sync_follower_regions(
319    context: &DdlContext,
320    table_id: TableId,
321    results: Vec<RegionResponse>,
322    region_routes: &[RegionRoute],
323    engine: &str,
324) -> Result<()> {
325    if engine != MITO_ENGINE && engine != METRIC_ENGINE {
326        info!(
327            "Skip submitting sync region requests for table_id: {}, engine: {}",
328            table_id, engine
329        );
330        return Ok(());
331    }
332
333    let results = results
334        .into_iter()
335        .map(|response| parse_manifest_infos_from_extensions(&response.extensions))
336        .collect::<Result<Vec<_>>>()?
337        .into_iter()
338        .flatten()
339        .collect::<HashMap<_, _>>();
340
341    let is_mito_engine = engine == MITO_ENGINE;
342
343    let followers = find_followers(region_routes);
344    if followers.is_empty() {
345        return Ok(());
346    }
347    let mut sync_region_tasks = Vec::with_capacity(followers.len());
348    for datanode in followers {
349        let requester = context.node_manager.datanode(&datanode).await;
350        let regions = find_follower_regions(region_routes, &datanode);
351        for region in regions {
352            let region_id = RegionId::new(table_id, region);
353            let manifest_info = if is_mito_engine {
354                let region_manifest_info =
355                    results.get(&region_id).context(error::UnexpectedSnafu {
356                        err_msg: format!("No manifest info found for region {}", region_id),
357                    })?;
358                ensure!(
359                    region_manifest_info.is_mito(),
360                    error::UnexpectedSnafu {
361                        err_msg: format!("Region {} is not a mito region", region_id)
362                    }
363                );
364                ManifestInfo::MitoManifestInfo(MitoManifestInfo {
365                    data_manifest_version: region_manifest_info.data_manifest_version(),
366                })
367            } else {
368                let region_manifest_info =
369                    results.get(&region_id).context(error::UnexpectedSnafu {
370                        err_msg: format!("No manifest info found for region {}", region_id),
371                    })?;
372                ensure!(
373                    region_manifest_info.is_metric(),
374                    error::UnexpectedSnafu {
375                        err_msg: format!("Region {} is not a metric region", region_id)
376                    }
377                );
378                ManifestInfo::MetricManifestInfo(MetricManifestInfo {
379                    data_manifest_version: region_manifest_info.data_manifest_version(),
380                    metadata_manifest_version: region_manifest_info
381                        .metadata_manifest_version()
382                        .unwrap_or_default(),
383                })
384            };
385            let request = RegionRequest {
386                header: Some(RegionRequestHeader {
387                    tracing_context: TracingContext::from_current_span().to_w3c(),
388                    ..Default::default()
389                }),
390                body: Some(region_request::Body::Sync(SyncRequest {
391                    region_id: region_id.as_u64(),
392                    manifest_info: Some(manifest_info),
393                })),
394            };
395
396            let datanode = datanode.clone();
397            let requester = requester.clone();
398            sync_region_tasks.push(async move {
399                requester
400                    .handle(request)
401                    .await
402                    .map_err(add_peer_context_if_needed(datanode))
403            });
404        }
405    }
406
407    // Failure to sync region is not critical.
408    // We try our best to sync the regions.
409    if let Err(err) = join_all(sync_region_tasks)
410        .await
411        .into_iter()
412        .collect::<Result<Vec<_>>>()
413    {
414        error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
415    }
416    info!("Sync follower regions on datanodes, table_id: {}", table_id);
417
418    Ok(())
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424
425    #[test]
426    fn test_get_catalog_and_schema() {
427        let test_catalog = "my_catalog";
428        let test_schema = "my_schema";
429        let path = region_storage_path(test_catalog, test_schema);
430        let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
431        assert_eq!(catalog, test_catalog);
432        assert_eq!(schema, test_schema);
433    }
434}