common_meta/ddl/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Debug;
17
18use api::region::RegionResponse;
19use api::v1::region::sync_request::ManifestInfo;
20use api::v1::region::{
21    region_request, MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader,
22    SyncRequest,
23};
24use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
25use common_error::ext::BoxedError;
26use common_procedure::error::Error as ProcedureError;
27use common_telemetry::tracing_context::TracingContext;
28use common_telemetry::{error, info, warn};
29use common_wal::options::WalOptions;
30use futures::future::join_all;
31use snafu::{ensure, OptionExt, ResultExt};
32use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
33use store_api::region_engine::RegionManifestInfo;
34use store_api::storage::{RegionId, RegionNumber};
35use table::metadata::TableId;
36use table::table_reference::TableReference;
37
38use crate::ddl::{DdlContext, DetectingRegion};
39use crate::error::{
40    self, Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu,
41    UnsupportedSnafu,
42};
43use crate::key::datanode_table::DatanodeTableValue;
44use crate::key::table_name::TableNameKey;
45use crate::key::TableMetadataManagerRef;
46use crate::peer::Peer;
47use crate::rpc::ddl::CreateTableTask;
48use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
49
50/// Adds [Peer] context if the error is unretryable.
51pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
52    move |err| {
53        error!(err; "Failed to operate datanode, peer: {}", datanode);
54        if !err.is_retry_later() {
55            return Err::<(), BoxedError>(BoxedError::new(err))
56                .context(OperateDatanodeSnafu { peer: datanode })
57                .unwrap_err();
58        }
59        err
60    }
61}
62
63pub fn handle_retry_error(e: Error) -> ProcedureError {
64    if e.is_retry_later() {
65        ProcedureError::retry_later(e)
66    } else {
67        ProcedureError::external(e)
68    }
69}
70
71#[inline]
72pub fn region_storage_path(catalog: &str, schema: &str) -> String {
73    format!("{}/{}", catalog, schema)
74}
75
76/// Extracts catalog and schema from the path that created by [region_storage_path].
77pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
78    let mut split = path.split('/');
79    Some((split.next()?.to_string(), split.next()?.to_string()))
80}
81
82pub async fn check_and_get_physical_table_id(
83    table_metadata_manager: &TableMetadataManagerRef,
84    tasks: &[CreateTableTask],
85) -> Result<TableId> {
86    let mut physical_table_name = None;
87    for task in tasks {
88        ensure!(
89            task.create_table.engine == METRIC_ENGINE,
90            UnsupportedSnafu {
91                operation: format!("create table with engine {}", task.create_table.engine)
92            }
93        );
94        let current_physical_table_name = task
95            .create_table
96            .table_options
97            .get(LOGICAL_TABLE_METADATA_KEY)
98            .context(UnsupportedSnafu {
99                operation: format!(
100                    "create table without table options {}",
101                    LOGICAL_TABLE_METADATA_KEY,
102                ),
103            })?;
104        let current_physical_table_name = TableNameKey::new(
105            &task.create_table.catalog_name,
106            &task.create_table.schema_name,
107            current_physical_table_name,
108        );
109
110        physical_table_name = match physical_table_name {
111            Some(name) => {
112                ensure!(
113                    name == current_physical_table_name,
114                    UnsupportedSnafu {
115                        operation: format!(
116                            "create table with different physical table name {} and {}",
117                            name, current_physical_table_name
118                        )
119                    }
120                );
121                Some(name)
122            }
123            None => Some(current_physical_table_name),
124        };
125    }
126    // Safety: `physical_table_name` is `Some` here
127    let physical_table_name = physical_table_name.unwrap();
128    table_metadata_manager
129        .table_name_manager()
130        .get(physical_table_name)
131        .await?
132        .with_context(|| TableNotFoundSnafu {
133            table_name: TableReference::from(physical_table_name).to_string(),
134        })
135        .map(|table| table.table_id())
136}
137
138pub async fn get_physical_table_id(
139    table_metadata_manager: &TableMetadataManagerRef,
140    logical_table_name: TableNameKey<'_>,
141) -> Result<TableId> {
142    let logical_table_id = table_metadata_manager
143        .table_name_manager()
144        .get(logical_table_name)
145        .await?
146        .with_context(|| TableNotFoundSnafu {
147            table_name: TableReference::from(logical_table_name).to_string(),
148        })
149        .map(|table| table.table_id())?;
150
151    table_metadata_manager
152        .table_route_manager()
153        .get_physical_table_id(logical_table_id)
154        .await
155}
156
157/// Converts a list of [`RegionRoute`] to a list of [`DetectingRegion`].
158pub fn convert_region_routes_to_detecting_regions(
159    region_routes: &[RegionRoute],
160) -> Vec<DetectingRegion> {
161    region_routes
162        .iter()
163        .flat_map(|route| {
164            route
165                .leader_peer
166                .as_ref()
167                .map(|peer| (peer.id, route.region.id))
168        })
169        .collect::<Vec<_>>()
170}
171
172/// Parses [WalOptions] from serialized strings in hashmap.
173pub fn parse_region_wal_options(
174    serialized_options: &HashMap<RegionNumber, String>,
175) -> Result<HashMap<RegionNumber, WalOptions>> {
176    let mut region_wal_options = HashMap::with_capacity(serialized_options.len());
177    for (region_number, wal_options) in serialized_options {
178        let wal_option = serde_json::from_str::<WalOptions>(wal_options)
179            .context(ParseWalOptionsSnafu { wal_options })?;
180        region_wal_options.insert(*region_number, wal_option);
181    }
182    Ok(region_wal_options)
183}
184
185/// Extracts region wal options from [DatanodeTableValue]s.
186pub fn extract_region_wal_options(
187    datanode_table_values: &Vec<DatanodeTableValue>,
188) -> Result<HashMap<RegionNumber, WalOptions>> {
189    let mut region_wal_options = HashMap::new();
190    for value in datanode_table_values {
191        let serialized_options = &value.region_info.region_wal_options;
192        let parsed_options = parse_region_wal_options(serialized_options)?;
193        region_wal_options.extend(parsed_options);
194    }
195    Ok(region_wal_options)
196}
197
198/// The result of multiple operations.
199///
200/// - Ok: all operations are successful.
201/// - PartialRetryable: if any operation is retryable and without non retryable error, the result is retryable.
202/// - PartialNonRetryable: if any operation is non retryable, the result is non retryable.
203/// - AllRetryable: all operations are retryable.
204/// - AllNonRetryable: all operations are not retryable.
205pub enum MultipleResults<T> {
206    Ok(Vec<T>),
207    PartialRetryable(Error),
208    PartialNonRetryable(Error),
209    AllRetryable(Error),
210    AllNonRetryable(Error),
211}
212
213/// Handles the results of alter region requests.
214///
215/// For partial success, we need to check if the errors are retryable.
216/// If all the errors are retryable, we return a retryable error.
217/// Otherwise, we return the first error.
218pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
219    if results.is_empty() {
220        return MultipleResults::Ok(Vec::new());
221    }
222    let num_results = results.len();
223    let mut retryable_results = Vec::new();
224    let mut non_retryable_results = Vec::new();
225    let mut ok_results = Vec::new();
226
227    for result in results {
228        match result {
229            Ok(value) => ok_results.push(value),
230            Err(err) => {
231                if err.is_retry_later() {
232                    retryable_results.push(err);
233                } else {
234                    non_retryable_results.push(err);
235                }
236            }
237        }
238    }
239
240    common_telemetry::debug!(
241        "retryable_results: {}, non_retryable_results: {}, ok_results: {}",
242        retryable_results.len(),
243        non_retryable_results.len(),
244        ok_results.len()
245    );
246
247    if retryable_results.len() == num_results {
248        return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
249    } else if non_retryable_results.len() == num_results {
250        warn!("all non retryable results: {}", non_retryable_results.len());
251        for err in &non_retryable_results {
252            error!(err; "non retryable error");
253        }
254        return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
255    } else if ok_results.len() == num_results {
256        return MultipleResults::Ok(ok_results);
257    } else if !retryable_results.is_empty()
258        && !ok_results.is_empty()
259        && non_retryable_results.is_empty()
260    {
261        return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
262    }
263
264    warn!(
265        "partial non retryable results: {}, retryable results: {}, ok results: {}",
266        non_retryable_results.len(),
267        retryable_results.len(),
268        ok_results.len()
269    );
270    for err in &non_retryable_results {
271        error!(err; "non retryable error");
272    }
273    // non_retryable_results.len() > 0
274    MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
275}
276
277/// Parses manifest infos from extensions.
278pub fn parse_manifest_infos_from_extensions(
279    extensions: &HashMap<String, Vec<u8>>,
280) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
281    let data_manifest_version =
282        extensions
283            .get(MANIFEST_INFO_EXTENSION_KEY)
284            .context(error::UnexpectedSnafu {
285                err_msg: "manifest info extension not found",
286            })?;
287    let data_manifest_version =
288        RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
289    Ok(data_manifest_version)
290}
291
292/// Sync follower regions on datanodes.
293pub async fn sync_follower_regions(
294    context: &DdlContext,
295    table_id: TableId,
296    results: Vec<RegionResponse>,
297    region_routes: &[RegionRoute],
298    engine: &str,
299) -> Result<()> {
300    if engine != MITO_ENGINE && engine != METRIC_ENGINE {
301        info!(
302            "Skip submitting sync region requests for table_id: {}, engine: {}",
303            table_id, engine
304        );
305        return Ok(());
306    }
307
308    let results = results
309        .into_iter()
310        .map(|response| parse_manifest_infos_from_extensions(&response.extensions))
311        .collect::<Result<Vec<_>>>()?
312        .into_iter()
313        .flatten()
314        .collect::<HashMap<_, _>>();
315
316    let is_mito_engine = engine == MITO_ENGINE;
317
318    let followers = find_followers(region_routes);
319    if followers.is_empty() {
320        return Ok(());
321    }
322    let mut sync_region_tasks = Vec::with_capacity(followers.len());
323    for datanode in followers {
324        let requester = context.node_manager.datanode(&datanode).await;
325        let regions = find_follower_regions(region_routes, &datanode);
326        for region in regions {
327            let region_id = RegionId::new(table_id, region);
328            let manifest_info = if is_mito_engine {
329                let region_manifest_info =
330                    results.get(&region_id).context(error::UnexpectedSnafu {
331                        err_msg: format!("No manifest info found for region {}", region_id),
332                    })?;
333                ensure!(
334                    region_manifest_info.is_mito(),
335                    error::UnexpectedSnafu {
336                        err_msg: format!("Region {} is not a mito region", region_id)
337                    }
338                );
339                ManifestInfo::MitoManifestInfo(MitoManifestInfo {
340                    data_manifest_version: region_manifest_info.data_manifest_version(),
341                })
342            } else {
343                let region_manifest_info =
344                    results.get(&region_id).context(error::UnexpectedSnafu {
345                        err_msg: format!("No manifest info found for region {}", region_id),
346                    })?;
347                ensure!(
348                    region_manifest_info.is_metric(),
349                    error::UnexpectedSnafu {
350                        err_msg: format!("Region {} is not a metric region", region_id)
351                    }
352                );
353                ManifestInfo::MetricManifestInfo(MetricManifestInfo {
354                    data_manifest_version: region_manifest_info.data_manifest_version(),
355                    metadata_manifest_version: region_manifest_info
356                        .metadata_manifest_version()
357                        .unwrap_or_default(),
358                })
359            };
360            let request = RegionRequest {
361                header: Some(RegionRequestHeader {
362                    tracing_context: TracingContext::from_current_span().to_w3c(),
363                    ..Default::default()
364                }),
365                body: Some(region_request::Body::Sync(SyncRequest {
366                    region_id: region_id.as_u64(),
367                    manifest_info: Some(manifest_info),
368                })),
369            };
370
371            let datanode = datanode.clone();
372            let requester = requester.clone();
373            sync_region_tasks.push(async move {
374                requester
375                    .handle(request)
376                    .await
377                    .map_err(add_peer_context_if_needed(datanode))
378            });
379        }
380    }
381
382    // Failure to sync region is not critical.
383    // We try our best to sync the regions.
384    if let Err(err) = join_all(sync_region_tasks)
385        .await
386        .into_iter()
387        .collect::<Result<Vec<_>>>()
388    {
389        error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
390    }
391    info!("Sync follower regions on datanodes, table_id: {}", table_id);
392
393    Ok(())
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399
400    #[test]
401    fn test_get_catalog_and_schema() {
402        let test_catalog = "my_catalog";
403        let test_schema = "my_schema";
404        let path = region_storage_path(test_catalog, test_schema);
405        let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
406        assert_eq!(catalog, test_catalog);
407        assert_eq!(schema, test_schema);
408    }
409}