common_meta/ddl/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod raw_table_info;
16pub(crate) mod table_id;
17pub(crate) mod table_info;
18
19use std::collections::HashMap;
20use std::fmt::Debug;
21
22use api::region::RegionResponse;
23use api::v1::region::sync_request::ManifestInfo;
24use api::v1::region::{
25    region_request, MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader,
26    SyncRequest,
27};
28use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
29use common_error::ext::BoxedError;
30use common_procedure::error::Error as ProcedureError;
31use common_telemetry::tracing_context::TracingContext;
32use common_telemetry::{error, info, warn};
33use common_wal::options::WalOptions;
34use futures::future::join_all;
35use snafu::{ensure, OptionExt, ResultExt};
36use store_api::metadata::ColumnMetadata;
37use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
38use store_api::region_engine::RegionManifestInfo;
39use store_api::storage::{RegionId, RegionNumber};
40use table::metadata::TableId;
41use table::table_reference::TableReference;
42
43use crate::ddl::{DdlContext, DetectingRegion};
44use crate::error::{
45    self, DecodeJsonSnafu, Error, MetadataCorruptionSnafu, OperateDatanodeSnafu,
46    ParseWalOptionsSnafu, Result, TableNotFoundSnafu, UnsupportedSnafu,
47};
48use crate::key::datanode_table::DatanodeTableValue;
49use crate::key::table_name::TableNameKey;
50use crate::key::table_route::TableRouteValue;
51use crate::key::{TableMetadataManager, TableMetadataManagerRef};
52use crate::peer::Peer;
53use crate::rpc::ddl::CreateTableTask;
54use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
55
56/// Adds [Peer] context if the error is unretryable.
57pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
58    move |err| {
59        error!(err; "Failed to operate datanode, peer: {}", datanode);
60        if !err.is_retry_later() {
61            return Err::<(), BoxedError>(BoxedError::new(err))
62                .context(OperateDatanodeSnafu { peer: datanode })
63                .unwrap_err();
64        }
65        err
66    }
67}
68
69/// Maps the error to the corresponding procedure error.
70///
71/// This function determines whether the error should be retried and if poison cleanup is needed,
72/// then maps it to the appropriate procedure error variant.
73pub fn map_to_procedure_error(e: Error) -> ProcedureError {
74    match (e.is_retry_later(), e.need_clean_poisons()) {
75        (true, true) => ProcedureError::retry_later_and_clean_poisons(e),
76        (true, false) => ProcedureError::retry_later(e),
77        (false, true) => ProcedureError::external_and_clean_poisons(e),
78        (false, false) => ProcedureError::external(e),
79    }
80}
81
82#[inline]
83pub fn region_storage_path(catalog: &str, schema: &str) -> String {
84    format!("{}/{}", catalog, schema)
85}
86
87/// Extracts catalog and schema from the path that created by [region_storage_path].
88pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
89    let mut split = path.split('/');
90    Some((split.next()?.to_string(), split.next()?.to_string()))
91}
92
93pub async fn check_and_get_physical_table_id(
94    table_metadata_manager: &TableMetadataManagerRef,
95    tasks: &[CreateTableTask],
96) -> Result<TableId> {
97    let mut physical_table_name = None;
98    for task in tasks {
99        ensure!(
100            task.create_table.engine == METRIC_ENGINE,
101            UnsupportedSnafu {
102                operation: format!("create table with engine {}", task.create_table.engine)
103            }
104        );
105        let current_physical_table_name = task
106            .create_table
107            .table_options
108            .get(LOGICAL_TABLE_METADATA_KEY)
109            .context(UnsupportedSnafu {
110                operation: format!(
111                    "create table without table options {}",
112                    LOGICAL_TABLE_METADATA_KEY,
113                ),
114            })?;
115        let current_physical_table_name = TableNameKey::new(
116            &task.create_table.catalog_name,
117            &task.create_table.schema_name,
118            current_physical_table_name,
119        );
120
121        physical_table_name = match physical_table_name {
122            Some(name) => {
123                ensure!(
124                    name == current_physical_table_name,
125                    UnsupportedSnafu {
126                        operation: format!(
127                            "create table with different physical table name {} and {}",
128                            name, current_physical_table_name
129                        )
130                    }
131                );
132                Some(name)
133            }
134            None => Some(current_physical_table_name),
135        };
136    }
137    // Safety: `physical_table_name` is `Some` here
138    let physical_table_name = physical_table_name.unwrap();
139    table_metadata_manager
140        .table_name_manager()
141        .get(physical_table_name)
142        .await?
143        .with_context(|| TableNotFoundSnafu {
144            table_name: TableReference::from(physical_table_name).to_string(),
145        })
146        .map(|table| table.table_id())
147}
148
149pub async fn get_physical_table_id(
150    table_metadata_manager: &TableMetadataManagerRef,
151    logical_table_name: TableNameKey<'_>,
152) -> Result<TableId> {
153    let logical_table_id = table_metadata_manager
154        .table_name_manager()
155        .get(logical_table_name)
156        .await?
157        .with_context(|| TableNotFoundSnafu {
158            table_name: TableReference::from(logical_table_name).to_string(),
159        })
160        .map(|table| table.table_id())?;
161
162    table_metadata_manager
163        .table_route_manager()
164        .get_physical_table_id(logical_table_id)
165        .await
166}
167
168/// Converts a list of [`RegionRoute`] to a list of [`DetectingRegion`].
169pub fn convert_region_routes_to_detecting_regions(
170    region_routes: &[RegionRoute],
171) -> Vec<DetectingRegion> {
172    region_routes
173        .iter()
174        .flat_map(|route| {
175            route
176                .leader_peer
177                .as_ref()
178                .map(|peer| (peer.id, route.region.id))
179        })
180        .collect::<Vec<_>>()
181}
182
183/// Parses [WalOptions] from serialized strings in hashmap.
184pub fn parse_region_wal_options(
185    serialized_options: &HashMap<RegionNumber, String>,
186) -> Result<HashMap<RegionNumber, WalOptions>> {
187    let mut region_wal_options = HashMap::with_capacity(serialized_options.len());
188    for (region_number, wal_options) in serialized_options {
189        let wal_option = serde_json::from_str::<WalOptions>(wal_options)
190            .context(ParseWalOptionsSnafu { wal_options })?;
191        region_wal_options.insert(*region_number, wal_option);
192    }
193    Ok(region_wal_options)
194}
195
196/// Gets the wal options for a table.
197pub async fn get_region_wal_options(
198    table_metadata_manager: &TableMetadataManager,
199    table_route_value: &TableRouteValue,
200    physical_table_id: TableId,
201) -> Result<HashMap<RegionNumber, WalOptions>> {
202    let region_wal_options =
203        if let TableRouteValue::Physical(table_route_value) = &table_route_value {
204            let datanode_table_values = table_metadata_manager
205                .datanode_table_manager()
206                .regions(physical_table_id, table_route_value)
207                .await?;
208            extract_region_wal_options(&datanode_table_values)?
209        } else {
210            HashMap::new()
211        };
212    Ok(region_wal_options)
213}
214
215/// Extracts region wal options from [DatanodeTableValue]s.
216pub fn extract_region_wal_options(
217    datanode_table_values: &Vec<DatanodeTableValue>,
218) -> Result<HashMap<RegionNumber, WalOptions>> {
219    let mut region_wal_options = HashMap::new();
220    for value in datanode_table_values {
221        let serialized_options = &value.region_info.region_wal_options;
222        let parsed_options = parse_region_wal_options(serialized_options)?;
223        region_wal_options.extend(parsed_options);
224    }
225    Ok(region_wal_options)
226}
227
228/// The result of multiple operations.
229///
230/// - Ok: all operations are successful.
231/// - PartialRetryable: if any operation is retryable and without non retryable error, the result is retryable.
232/// - PartialNonRetryable: if any operation is non retryable, the result is non retryable.
233/// - AllRetryable: all operations are retryable.
234/// - AllNonRetryable: all operations are not retryable.
235pub enum MultipleResults<T> {
236    Ok(Vec<T>),
237    PartialRetryable(Error),
238    PartialNonRetryable(Error),
239    AllRetryable(Error),
240    AllNonRetryable(Error),
241}
242
243/// Handles the results of alter region requests.
244///
245/// For partial success, we need to check if the errors are retryable.
246/// If all the errors are retryable, we return a retryable error.
247/// Otherwise, we return the first error.
248pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
249    if results.is_empty() {
250        return MultipleResults::Ok(Vec::new());
251    }
252    let num_results = results.len();
253    let mut retryable_results = Vec::new();
254    let mut non_retryable_results = Vec::new();
255    let mut ok_results = Vec::new();
256
257    for result in results {
258        match result {
259            Ok(value) => ok_results.push(value),
260            Err(err) => {
261                if err.is_retry_later() {
262                    retryable_results.push(err);
263                } else {
264                    non_retryable_results.push(err);
265                }
266            }
267        }
268    }
269
270    common_telemetry::debug!(
271        "retryable_results: {}, non_retryable_results: {}, ok_results: {}",
272        retryable_results.len(),
273        non_retryable_results.len(),
274        ok_results.len()
275    );
276
277    if retryable_results.len() == num_results {
278        return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
279    } else if non_retryable_results.len() == num_results {
280        warn!("all non retryable results: {}", non_retryable_results.len());
281        for err in &non_retryable_results {
282            error!(err; "non retryable error");
283        }
284        return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
285    } else if ok_results.len() == num_results {
286        return MultipleResults::Ok(ok_results);
287    } else if !retryable_results.is_empty()
288        && !ok_results.is_empty()
289        && non_retryable_results.is_empty()
290    {
291        return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
292    }
293
294    warn!(
295        "partial non retryable results: {}, retryable results: {}, ok results: {}",
296        non_retryable_results.len(),
297        retryable_results.len(),
298        ok_results.len()
299    );
300    for err in &non_retryable_results {
301        error!(err; "non retryable error");
302    }
303    // non_retryable_results.len() > 0
304    MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
305}
306
307/// Parses manifest infos from extensions.
308pub fn parse_manifest_infos_from_extensions(
309    extensions: &HashMap<String, Vec<u8>>,
310) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
311    let data_manifest_version =
312        extensions
313            .get(MANIFEST_INFO_EXTENSION_KEY)
314            .context(error::UnexpectedSnafu {
315                err_msg: "manifest info extension not found",
316            })?;
317    let data_manifest_version =
318        RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
319    Ok(data_manifest_version)
320}
321
322/// Parses column metadatas from extensions.
323pub fn parse_column_metadatas(
324    extensions: &HashMap<String, Vec<u8>>,
325    key: &str,
326) -> Result<Vec<ColumnMetadata>> {
327    let value = extensions.get(key).context(error::UnexpectedSnafu {
328        err_msg: format!("column metadata extension not found: {}", key),
329    })?;
330    let column_metadatas = ColumnMetadata::decode_list(value).context(error::SerdeJsonSnafu {})?;
331    Ok(column_metadatas)
332}
333
334/// Sync follower regions on datanodes.
335pub async fn sync_follower_regions(
336    context: &DdlContext,
337    table_id: TableId,
338    results: &[RegionResponse],
339    region_routes: &[RegionRoute],
340    engine: &str,
341) -> Result<()> {
342    if engine != MITO_ENGINE && engine != METRIC_ENGINE {
343        info!(
344            "Skip submitting sync region requests for table_id: {}, engine: {}",
345            table_id, engine
346        );
347        return Ok(());
348    }
349
350    let results = results
351        .iter()
352        .map(|response| parse_manifest_infos_from_extensions(&response.extensions))
353        .collect::<Result<Vec<_>>>()?
354        .into_iter()
355        .flatten()
356        .collect::<HashMap<_, _>>();
357
358    let is_mito_engine = engine == MITO_ENGINE;
359
360    let followers = find_followers(region_routes);
361    if followers.is_empty() {
362        return Ok(());
363    }
364    let mut sync_region_tasks = Vec::with_capacity(followers.len());
365    for datanode in followers {
366        let requester = context.node_manager.datanode(&datanode).await;
367        let regions = find_follower_regions(region_routes, &datanode);
368        for region in regions {
369            let region_id = RegionId::new(table_id, region);
370            let manifest_info = if is_mito_engine {
371                let region_manifest_info =
372                    results.get(&region_id).context(error::UnexpectedSnafu {
373                        err_msg: format!("No manifest info found for region {}", region_id),
374                    })?;
375                ensure!(
376                    region_manifest_info.is_mito(),
377                    error::UnexpectedSnafu {
378                        err_msg: format!("Region {} is not a mito region", region_id)
379                    }
380                );
381                ManifestInfo::MitoManifestInfo(MitoManifestInfo {
382                    data_manifest_version: region_manifest_info.data_manifest_version(),
383                })
384            } else {
385                let region_manifest_info =
386                    results.get(&region_id).context(error::UnexpectedSnafu {
387                        err_msg: format!("No manifest info found for region {}", region_id),
388                    })?;
389                ensure!(
390                    region_manifest_info.is_metric(),
391                    error::UnexpectedSnafu {
392                        err_msg: format!("Region {} is not a metric region", region_id)
393                    }
394                );
395                ManifestInfo::MetricManifestInfo(MetricManifestInfo {
396                    data_manifest_version: region_manifest_info.data_manifest_version(),
397                    metadata_manifest_version: region_manifest_info
398                        .metadata_manifest_version()
399                        .unwrap_or_default(),
400                })
401            };
402            let request = RegionRequest {
403                header: Some(RegionRequestHeader {
404                    tracing_context: TracingContext::from_current_span().to_w3c(),
405                    ..Default::default()
406                }),
407                body: Some(region_request::Body::Sync(SyncRequest {
408                    region_id: region_id.as_u64(),
409                    manifest_info: Some(manifest_info),
410                })),
411            };
412
413            let datanode = datanode.clone();
414            let requester = requester.clone();
415            sync_region_tasks.push(async move {
416                requester
417                    .handle(request)
418                    .await
419                    .map_err(add_peer_context_if_needed(datanode))
420            });
421        }
422    }
423
424    // Failure to sync region is not critical.
425    // We try our best to sync the regions.
426    if let Err(err) = join_all(sync_region_tasks)
427        .await
428        .into_iter()
429        .collect::<Result<Vec<_>>>()
430    {
431        error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
432    }
433    info!("Sync follower regions on datanodes, table_id: {}", table_id);
434
435    Ok(())
436}
437
438/// Extracts column metadatas from extensions.
439pub fn extract_column_metadatas(
440    results: &mut [RegionResponse],
441    key: &str,
442) -> Result<Option<Vec<ColumnMetadata>>> {
443    let schemas = results
444        .iter_mut()
445        .map(|r| r.extensions.remove(key))
446        .collect::<Vec<_>>();
447
448    if schemas.is_empty() {
449        return Ok(None);
450    }
451
452    // Verify all the physical schemas are the same
453    // Safety: previous check ensures this vec is not empty
454    let first = schemas.first().unwrap();
455    ensure!(
456        schemas.iter().all(|x| x == first),
457        MetadataCorruptionSnafu {
458            err_msg: "The table column metadata schemas from datanodes are not the same."
459        }
460    );
461
462    if let Some(first) = first {
463        let column_metadatas = ColumnMetadata::decode_list(first).context(DecodeJsonSnafu)?;
464        Ok(Some(column_metadatas))
465    } else {
466        Ok(None)
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    #[test]
475    fn test_get_catalog_and_schema() {
476        let test_catalog = "my_catalog";
477        let test_schema = "my_schema";
478        let path = region_storage_path(test_catalog, test_schema);
479        let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
480        assert_eq!(catalog, test_catalog);
481        assert_eq!(schema, test_schema);
482    }
483}