common_meta/ddl/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Debug;
17
18use api::region::RegionResponse;
19use api::v1::region::sync_request::ManifestInfo;
20use api::v1::region::{
21    region_request, MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader,
22    SyncRequest,
23};
24use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
25use common_error::ext::BoxedError;
26use common_procedure::error::Error as ProcedureError;
27use common_telemetry::tracing_context::TracingContext;
28use common_telemetry::{error, info, warn};
29use common_wal::options::WalOptions;
30use futures::future::join_all;
31use snafu::{ensure, OptionExt, ResultExt};
32use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
33use store_api::region_engine::RegionManifestInfo;
34use store_api::storage::{RegionId, RegionNumber};
35use table::metadata::TableId;
36use table::table_reference::TableReference;
37
38use crate::ddl::{DdlContext, DetectingRegion};
39use crate::error::{
40    self, Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu,
41    UnsupportedSnafu,
42};
43use crate::key::datanode_table::DatanodeTableValue;
44use crate::key::table_name::TableNameKey;
45use crate::key::TableMetadataManagerRef;
46use crate::peer::Peer;
47use crate::rpc::ddl::CreateTableTask;
48use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
49
50/// Adds [Peer] context if the error is unretryable.
51pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
52    move |err| {
53        error!(err; "Failed to operate datanode, peer: {}", datanode);
54        if !err.is_retry_later() {
55            return Err::<(), BoxedError>(BoxedError::new(err))
56                .context(OperateDatanodeSnafu { peer: datanode })
57                .unwrap_err();
58        }
59        err
60    }
61}
62
63/// Maps the error to the corresponding procedure error.
64///
65/// This function determines whether the error should be retried and if poison cleanup is needed,
66/// then maps it to the appropriate procedure error variant.
67pub fn map_to_procedure_error(e: Error) -> ProcedureError {
68    match (e.is_retry_later(), e.need_clean_poisons()) {
69        (true, true) => ProcedureError::retry_later_and_clean_poisons(e),
70        (true, false) => ProcedureError::retry_later(e),
71        (false, true) => ProcedureError::external_and_clean_poisons(e),
72        (false, false) => ProcedureError::external(e),
73    }
74}
75
76#[inline]
77pub fn region_storage_path(catalog: &str, schema: &str) -> String {
78    format!("{}/{}", catalog, schema)
79}
80
81/// Extracts catalog and schema from the path that created by [region_storage_path].
82pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
83    let mut split = path.split('/');
84    Some((split.next()?.to_string(), split.next()?.to_string()))
85}
86
87pub async fn check_and_get_physical_table_id(
88    table_metadata_manager: &TableMetadataManagerRef,
89    tasks: &[CreateTableTask],
90) -> Result<TableId> {
91    let mut physical_table_name = None;
92    for task in tasks {
93        ensure!(
94            task.create_table.engine == METRIC_ENGINE,
95            UnsupportedSnafu {
96                operation: format!("create table with engine {}", task.create_table.engine)
97            }
98        );
99        let current_physical_table_name = task
100            .create_table
101            .table_options
102            .get(LOGICAL_TABLE_METADATA_KEY)
103            .context(UnsupportedSnafu {
104                operation: format!(
105                    "create table without table options {}",
106                    LOGICAL_TABLE_METADATA_KEY,
107                ),
108            })?;
109        let current_physical_table_name = TableNameKey::new(
110            &task.create_table.catalog_name,
111            &task.create_table.schema_name,
112            current_physical_table_name,
113        );
114
115        physical_table_name = match physical_table_name {
116            Some(name) => {
117                ensure!(
118                    name == current_physical_table_name,
119                    UnsupportedSnafu {
120                        operation: format!(
121                            "create table with different physical table name {} and {}",
122                            name, current_physical_table_name
123                        )
124                    }
125                );
126                Some(name)
127            }
128            None => Some(current_physical_table_name),
129        };
130    }
131    // Safety: `physical_table_name` is `Some` here
132    let physical_table_name = physical_table_name.unwrap();
133    table_metadata_manager
134        .table_name_manager()
135        .get(physical_table_name)
136        .await?
137        .with_context(|| TableNotFoundSnafu {
138            table_name: TableReference::from(physical_table_name).to_string(),
139        })
140        .map(|table| table.table_id())
141}
142
143pub async fn get_physical_table_id(
144    table_metadata_manager: &TableMetadataManagerRef,
145    logical_table_name: TableNameKey<'_>,
146) -> Result<TableId> {
147    let logical_table_id = table_metadata_manager
148        .table_name_manager()
149        .get(logical_table_name)
150        .await?
151        .with_context(|| TableNotFoundSnafu {
152            table_name: TableReference::from(logical_table_name).to_string(),
153        })
154        .map(|table| table.table_id())?;
155
156    table_metadata_manager
157        .table_route_manager()
158        .get_physical_table_id(logical_table_id)
159        .await
160}
161
162/// Converts a list of [`RegionRoute`] to a list of [`DetectingRegion`].
163pub fn convert_region_routes_to_detecting_regions(
164    region_routes: &[RegionRoute],
165) -> Vec<DetectingRegion> {
166    region_routes
167        .iter()
168        .flat_map(|route| {
169            route
170                .leader_peer
171                .as_ref()
172                .map(|peer| (peer.id, route.region.id))
173        })
174        .collect::<Vec<_>>()
175}
176
177/// Parses [WalOptions] from serialized strings in hashmap.
178pub fn parse_region_wal_options(
179    serialized_options: &HashMap<RegionNumber, String>,
180) -> Result<HashMap<RegionNumber, WalOptions>> {
181    let mut region_wal_options = HashMap::with_capacity(serialized_options.len());
182    for (region_number, wal_options) in serialized_options {
183        let wal_option = serde_json::from_str::<WalOptions>(wal_options)
184            .context(ParseWalOptionsSnafu { wal_options })?;
185        region_wal_options.insert(*region_number, wal_option);
186    }
187    Ok(region_wal_options)
188}
189
190/// Extracts region wal options from [DatanodeTableValue]s.
191pub fn extract_region_wal_options(
192    datanode_table_values: &Vec<DatanodeTableValue>,
193) -> Result<HashMap<RegionNumber, WalOptions>> {
194    let mut region_wal_options = HashMap::new();
195    for value in datanode_table_values {
196        let serialized_options = &value.region_info.region_wal_options;
197        let parsed_options = parse_region_wal_options(serialized_options)?;
198        region_wal_options.extend(parsed_options);
199    }
200    Ok(region_wal_options)
201}
202
203/// The result of multiple operations.
204///
205/// - Ok: all operations are successful.
206/// - PartialRetryable: if any operation is retryable and without non retryable error, the result is retryable.
207/// - PartialNonRetryable: if any operation is non retryable, the result is non retryable.
208/// - AllRetryable: all operations are retryable.
209/// - AllNonRetryable: all operations are not retryable.
210pub enum MultipleResults<T> {
211    Ok(Vec<T>),
212    PartialRetryable(Error),
213    PartialNonRetryable(Error),
214    AllRetryable(Error),
215    AllNonRetryable(Error),
216}
217
218/// Handles the results of alter region requests.
219///
220/// For partial success, we need to check if the errors are retryable.
221/// If all the errors are retryable, we return a retryable error.
222/// Otherwise, we return the first error.
223pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
224    if results.is_empty() {
225        return MultipleResults::Ok(Vec::new());
226    }
227    let num_results = results.len();
228    let mut retryable_results = Vec::new();
229    let mut non_retryable_results = Vec::new();
230    let mut ok_results = Vec::new();
231
232    for result in results {
233        match result {
234            Ok(value) => ok_results.push(value),
235            Err(err) => {
236                if err.is_retry_later() {
237                    retryable_results.push(err);
238                } else {
239                    non_retryable_results.push(err);
240                }
241            }
242        }
243    }
244
245    common_telemetry::debug!(
246        "retryable_results: {}, non_retryable_results: {}, ok_results: {}",
247        retryable_results.len(),
248        non_retryable_results.len(),
249        ok_results.len()
250    );
251
252    if retryable_results.len() == num_results {
253        return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
254    } else if non_retryable_results.len() == num_results {
255        warn!("all non retryable results: {}", non_retryable_results.len());
256        for err in &non_retryable_results {
257            error!(err; "non retryable error");
258        }
259        return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
260    } else if ok_results.len() == num_results {
261        return MultipleResults::Ok(ok_results);
262    } else if !retryable_results.is_empty()
263        && !ok_results.is_empty()
264        && non_retryable_results.is_empty()
265    {
266        return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
267    }
268
269    warn!(
270        "partial non retryable results: {}, retryable results: {}, ok results: {}",
271        non_retryable_results.len(),
272        retryable_results.len(),
273        ok_results.len()
274    );
275    for err in &non_retryable_results {
276        error!(err; "non retryable error");
277    }
278    // non_retryable_results.len() > 0
279    MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
280}
281
282/// Parses manifest infos from extensions.
283pub fn parse_manifest_infos_from_extensions(
284    extensions: &HashMap<String, Vec<u8>>,
285) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
286    let data_manifest_version =
287        extensions
288            .get(MANIFEST_INFO_EXTENSION_KEY)
289            .context(error::UnexpectedSnafu {
290                err_msg: "manifest info extension not found",
291            })?;
292    let data_manifest_version =
293        RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
294    Ok(data_manifest_version)
295}
296
297/// Sync follower regions on datanodes.
298pub async fn sync_follower_regions(
299    context: &DdlContext,
300    table_id: TableId,
301    results: Vec<RegionResponse>,
302    region_routes: &[RegionRoute],
303    engine: &str,
304) -> Result<()> {
305    if engine != MITO_ENGINE && engine != METRIC_ENGINE {
306        info!(
307            "Skip submitting sync region requests for table_id: {}, engine: {}",
308            table_id, engine
309        );
310        return Ok(());
311    }
312
313    let results = results
314        .into_iter()
315        .map(|response| parse_manifest_infos_from_extensions(&response.extensions))
316        .collect::<Result<Vec<_>>>()?
317        .into_iter()
318        .flatten()
319        .collect::<HashMap<_, _>>();
320
321    let is_mito_engine = engine == MITO_ENGINE;
322
323    let followers = find_followers(region_routes);
324    if followers.is_empty() {
325        return Ok(());
326    }
327    let mut sync_region_tasks = Vec::with_capacity(followers.len());
328    for datanode in followers {
329        let requester = context.node_manager.datanode(&datanode).await;
330        let regions = find_follower_regions(region_routes, &datanode);
331        for region in regions {
332            let region_id = RegionId::new(table_id, region);
333            let manifest_info = if is_mito_engine {
334                let region_manifest_info =
335                    results.get(&region_id).context(error::UnexpectedSnafu {
336                        err_msg: format!("No manifest info found for region {}", region_id),
337                    })?;
338                ensure!(
339                    region_manifest_info.is_mito(),
340                    error::UnexpectedSnafu {
341                        err_msg: format!("Region {} is not a mito region", region_id)
342                    }
343                );
344                ManifestInfo::MitoManifestInfo(MitoManifestInfo {
345                    data_manifest_version: region_manifest_info.data_manifest_version(),
346                })
347            } else {
348                let region_manifest_info =
349                    results.get(&region_id).context(error::UnexpectedSnafu {
350                        err_msg: format!("No manifest info found for region {}", region_id),
351                    })?;
352                ensure!(
353                    region_manifest_info.is_metric(),
354                    error::UnexpectedSnafu {
355                        err_msg: format!("Region {} is not a metric region", region_id)
356                    }
357                );
358                ManifestInfo::MetricManifestInfo(MetricManifestInfo {
359                    data_manifest_version: region_manifest_info.data_manifest_version(),
360                    metadata_manifest_version: region_manifest_info
361                        .metadata_manifest_version()
362                        .unwrap_or_default(),
363                })
364            };
365            let request = RegionRequest {
366                header: Some(RegionRequestHeader {
367                    tracing_context: TracingContext::from_current_span().to_w3c(),
368                    ..Default::default()
369                }),
370                body: Some(region_request::Body::Sync(SyncRequest {
371                    region_id: region_id.as_u64(),
372                    manifest_info: Some(manifest_info),
373                })),
374            };
375
376            let datanode = datanode.clone();
377            let requester = requester.clone();
378            sync_region_tasks.push(async move {
379                requester
380                    .handle(request)
381                    .await
382                    .map_err(add_peer_context_if_needed(datanode))
383            });
384        }
385    }
386
387    // Failure to sync region is not critical.
388    // We try our best to sync the regions.
389    if let Err(err) = join_all(sync_region_tasks)
390        .await
391        .into_iter()
392        .collect::<Result<Vec<_>>>()
393    {
394        error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
395    }
396    info!("Sync follower regions on datanodes, table_id: {}", table_id);
397
398    Ok(())
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404
405    #[test]
406    fn test_get_catalog_and_schema() {
407        let test_catalog = "my_catalog";
408        let test_schema = "my_schema";
409        let path = region_storage_path(test_catalog, test_schema);
410        let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
411        assert_eq!(catalog, test_catalog);
412        assert_eq!(schema, test_schema);
413    }
414}