Skip to main content

common_meta/ddl/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod raw_table_info;
16#[allow(dead_code)]
17pub(crate) mod region_metadata_lister;
18pub(crate) mod table_id;
19pub(crate) mod table_info;
20
21use std::collections::HashMap;
22use std::fmt::Debug;
23
24use api::region::RegionResponse;
25use api::v1::region::sync_request::ManifestInfo;
26use api::v1::region::{
27    MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader, SyncRequest,
28    region_request,
29};
30use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
31use common_error::ext::BoxedError;
32use common_procedure::error::Error as ProcedureError;
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{error, info, warn};
35use futures::future::join_all;
36use snafu::{OptionExt, ResultExt, ensure};
37use store_api::metadata::ColumnMetadata;
38use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
39use store_api::region_engine::RegionManifestInfo;
40use store_api::storage::RegionId;
41use table::metadata::TableId;
42use table::table_reference::TableReference;
43
44use crate::ddl::{DdlContext, DetectingRegion};
45use crate::error::{
46    self, DecodeJsonSnafu, Error, MetadataCorruptionSnafu, OperateDatanodeSnafu, Result,
47    TableNotFoundSnafu, UnsupportedSnafu,
48};
49use crate::key::datanode_table::DatanodeTableValue;
50use crate::key::table_name::TableNameKey;
51use crate::key::table_route::TableRouteValue;
52use crate::key::{TableMetadataManager, TableMetadataManagerRef};
53use crate::peer::Peer;
54use crate::rpc::ddl::CreateTableTask;
55use crate::rpc::router::{RegionRoute, find_follower_regions, find_followers};
56use crate::wal_provider::RegionWalOptions;
57
58/// Adds [Peer] context if the error is unretryable.
59pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
60    move |err| {
61        error!(err; "Failed to operate datanode, peer: {}", datanode);
62        if !err.is_retry_later() {
63            return Err::<(), BoxedError>(BoxedError::new(err))
64                .context(OperateDatanodeSnafu { peer: datanode })
65                .unwrap_err();
66        }
67        err
68    }
69}
70
71/// Maps the error to the corresponding procedure error.
72///
73/// This function determines whether the error should be retried and if poison cleanup is needed,
74/// then maps it to the appropriate procedure error variant.
75pub fn map_to_procedure_error(e: Error) -> ProcedureError {
76    match (e.is_retry_later(), e.need_clean_poisons()) {
77        (true, true) => ProcedureError::retry_later_and_clean_poisons(e),
78        (true, false) => ProcedureError::retry_later(e),
79        (false, true) => ProcedureError::external_and_clean_poisons(e),
80        (false, false) => ProcedureError::external(e),
81    }
82}
83
84#[inline]
85pub fn region_storage_path(catalog: &str, schema: &str) -> String {
86    format!("{}/{}", catalog, schema)
87}
88
89/// Extracts catalog and schema from the path that created by [region_storage_path].
90pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
91    let mut split = path.split('/');
92    Some((split.next()?.to_string(), split.next()?.to_string()))
93}
94
95pub async fn check_and_get_physical_table_id(
96    table_metadata_manager: &TableMetadataManagerRef,
97    tasks: &[CreateTableTask],
98) -> Result<TableId> {
99    let mut physical_table_name = None;
100    for task in tasks {
101        ensure!(
102            task.create_table.engine == METRIC_ENGINE,
103            UnsupportedSnafu {
104                operation: format!("create table with engine {}", task.create_table.engine)
105            }
106        );
107        let current_physical_table_name = task
108            .create_table
109            .table_options
110            .get(LOGICAL_TABLE_METADATA_KEY)
111            .context(UnsupportedSnafu {
112                operation: format!(
113                    "create table without table options {}",
114                    LOGICAL_TABLE_METADATA_KEY,
115                ),
116            })?;
117        let current_physical_table_name = TableNameKey::new(
118            &task.create_table.catalog_name,
119            &task.create_table.schema_name,
120            current_physical_table_name,
121        );
122
123        physical_table_name = match physical_table_name {
124            Some(name) => {
125                ensure!(
126                    name == current_physical_table_name,
127                    UnsupportedSnafu {
128                        operation: format!(
129                            "create table with different physical table name {} and {}",
130                            name, current_physical_table_name
131                        )
132                    }
133                );
134                Some(name)
135            }
136            None => Some(current_physical_table_name),
137        };
138    }
139    // Safety: `physical_table_name` is `Some` here
140    let physical_table_name = physical_table_name.unwrap();
141    table_metadata_manager
142        .table_name_manager()
143        .get(physical_table_name)
144        .await?
145        .with_context(|| TableNotFoundSnafu {
146            table_name: TableReference::from(physical_table_name).to_string(),
147        })
148        .map(|table| table.table_id())
149}
150
151pub async fn get_physical_table_id(
152    table_metadata_manager: &TableMetadataManagerRef,
153    logical_table_name: TableNameKey<'_>,
154) -> Result<TableId> {
155    let logical_table_id = table_metadata_manager
156        .table_name_manager()
157        .get(logical_table_name)
158        .await?
159        .with_context(|| TableNotFoundSnafu {
160            table_name: TableReference::from(logical_table_name).to_string(),
161        })
162        .map(|table| table.table_id())?;
163
164    table_metadata_manager
165        .table_route_manager()
166        .get_physical_table_id(logical_table_id)
167        .await
168}
169
170/// Converts a list of [`RegionRoute`] to a list of [`DetectingRegion`].
171pub fn convert_region_routes_to_detecting_regions(
172    region_routes: &[RegionRoute],
173) -> Vec<DetectingRegion> {
174    region_routes
175        .iter()
176        .flat_map(|route| {
177            route
178                .leader_peer
179                .as_ref()
180                .map(|peer| (peer.id, route.region.id))
181        })
182        .collect::<Vec<_>>()
183}
184
185/// Gets the wal options for a table.
186pub async fn get_region_wal_options(
187    table_metadata_manager: &TableMetadataManager,
188    table_route_value: &TableRouteValue,
189    physical_table_id: TableId,
190) -> Result<RegionWalOptions> {
191    let region_wal_options =
192        if let TableRouteValue::Physical(table_route_value) = &table_route_value {
193            let datanode_table_values = table_metadata_manager
194                .datanode_table_manager()
195                .regions(physical_table_id, table_route_value)
196                .await?;
197            extract_region_wal_options(&datanode_table_values)?
198        } else {
199            HashMap::new()
200        };
201    Ok(region_wal_options)
202}
203
204/// Extracts region wal options from [DatanodeTableValue]s.
205pub fn extract_region_wal_options(
206    datanode_table_values: &Vec<DatanodeTableValue>,
207) -> Result<RegionWalOptions> {
208    let mut region_wal_options = RegionWalOptions::new();
209    for value in datanode_table_values {
210        region_wal_options.extend(value.region_info.region_wal_options.clone());
211    }
212    Ok(region_wal_options)
213}
214
215/// The result of multiple operations.
216///
217/// - Ok: all operations are successful.
218/// - PartialRetryable: if any operation is retryable and without non retryable error, the result is retryable.
219/// - PartialNonRetryable: if any operation is non retryable, the result is non retryable.
220/// - AllRetryable: all operations are retryable.
221/// - AllNonRetryable: all operations are not retryable.
222pub enum MultipleResults<T> {
223    Ok(Vec<T>),
224    PartialRetryable(Error),
225    PartialNonRetryable(Error),
226    AllRetryable(Error),
227    AllNonRetryable(Error),
228}
229
230/// Handles the results of alter region requests.
231///
232/// For partial success, we need to check if the errors are retryable.
233/// If all the errors are retryable, we return a retryable error.
234/// Otherwise, we return the first error.
235pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
236    if results.is_empty() {
237        return MultipleResults::Ok(Vec::new());
238    }
239    let num_results = results.len();
240    let mut retryable_results = Vec::new();
241    let mut non_retryable_results = Vec::new();
242    let mut ok_results = Vec::new();
243
244    for result in results {
245        match result {
246            Ok(value) => ok_results.push(value),
247            Err(err) => {
248                if err.is_retry_later() {
249                    retryable_results.push(err);
250                } else {
251                    non_retryable_results.push(err);
252                }
253            }
254        }
255    }
256
257    common_telemetry::debug!(
258        "retryable_results: {}, non_retryable_results: {}, ok_results: {}",
259        retryable_results.len(),
260        non_retryable_results.len(),
261        ok_results.len()
262    );
263
264    if retryable_results.len() == num_results {
265        return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
266    } else if non_retryable_results.len() == num_results {
267        warn!("all non retryable results: {}", non_retryable_results.len());
268        for err in &non_retryable_results {
269            error!(err; "non retryable error");
270        }
271        return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
272    } else if ok_results.len() == num_results {
273        return MultipleResults::Ok(ok_results);
274    } else if !retryable_results.is_empty()
275        && !ok_results.is_empty()
276        && non_retryable_results.is_empty()
277    {
278        return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
279    }
280
281    warn!(
282        "partial non retryable results: {}, retryable results: {}, ok results: {}",
283        non_retryable_results.len(),
284        retryable_results.len(),
285        ok_results.len()
286    );
287    for err in &non_retryable_results {
288        error!(err; "non retryable error");
289    }
290    // non_retryable_results.len() > 0
291    MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
292}
293
294/// Parses manifest infos from extensions.
295pub fn parse_manifest_infos_from_extensions(
296    extensions: &HashMap<String, Vec<u8>>,
297) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
298    let data_manifest_version =
299        extensions
300            .get(MANIFEST_INFO_EXTENSION_KEY)
301            .context(error::UnexpectedSnafu {
302                err_msg: "manifest info extension not found",
303            })?;
304    let data_manifest_version =
305        RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
306    Ok(data_manifest_version)
307}
308
309/// Parses column metadatas from extensions.
310pub fn parse_column_metadatas(
311    extensions: &HashMap<String, Vec<u8>>,
312    key: &str,
313) -> Result<Vec<ColumnMetadata>> {
314    let value = extensions.get(key).context(error::UnexpectedSnafu {
315        err_msg: format!("column metadata extension not found: {}", key),
316    })?;
317    let column_metadatas = ColumnMetadata::decode_list(value).context(error::SerdeJsonSnafu {})?;
318    Ok(column_metadatas)
319}
320
321/// Sync follower regions on datanodes.
322pub async fn sync_follower_regions(
323    context: &DdlContext,
324    table_id: TableId,
325    results: &[RegionResponse],
326    region_routes: &[RegionRoute],
327    engine: &str,
328) -> Result<()> {
329    if engine != MITO_ENGINE && engine != METRIC_ENGINE {
330        info!(
331            "Skip submitting sync region requests for table_id: {}, engine: {}",
332            table_id, engine
333        );
334        return Ok(());
335    }
336
337    let results = results
338        .iter()
339        .map(|response| parse_manifest_infos_from_extensions(&response.extensions))
340        .collect::<Result<Vec<_>>>()?
341        .into_iter()
342        .flatten()
343        .collect::<HashMap<_, _>>();
344
345    let is_mito_engine = engine == MITO_ENGINE;
346
347    let followers = find_followers(region_routes);
348    if followers.is_empty() {
349        return Ok(());
350    }
351    let mut sync_region_tasks = Vec::with_capacity(followers.len());
352    for datanode in followers {
353        let requester = context.node_manager.datanode(&datanode).await;
354        let regions = find_follower_regions(region_routes, &datanode);
355        for region in regions {
356            let region_id = RegionId::new(table_id, region);
357            let manifest_info = if is_mito_engine {
358                let region_manifest_info =
359                    results.get(&region_id).context(error::UnexpectedSnafu {
360                        err_msg: format!("No manifest info found for region {}", region_id),
361                    })?;
362                ensure!(
363                    region_manifest_info.is_mito(),
364                    error::UnexpectedSnafu {
365                        err_msg: format!("Region {} is not a mito region", region_id)
366                    }
367                );
368                ManifestInfo::MitoManifestInfo(MitoManifestInfo {
369                    data_manifest_version: region_manifest_info.data_manifest_version(),
370                })
371            } else {
372                let region_manifest_info =
373                    results.get(&region_id).context(error::UnexpectedSnafu {
374                        err_msg: format!("No manifest info found for region {}", region_id),
375                    })?;
376                ensure!(
377                    region_manifest_info.is_metric(),
378                    error::UnexpectedSnafu {
379                        err_msg: format!("Region {} is not a metric region", region_id)
380                    }
381                );
382                ManifestInfo::MetricManifestInfo(MetricManifestInfo {
383                    data_manifest_version: region_manifest_info.data_manifest_version(),
384                    metadata_manifest_version: region_manifest_info
385                        .metadata_manifest_version()
386                        .unwrap_or_default(),
387                })
388            };
389            let request = RegionRequest {
390                header: Some(RegionRequestHeader {
391                    tracing_context: TracingContext::from_current_span().to_w3c(),
392                    ..Default::default()
393                }),
394                body: Some(region_request::Body::Sync(SyncRequest {
395                    region_id: region_id.as_u64(),
396                    manifest_info: Some(manifest_info),
397                })),
398            };
399
400            let datanode = datanode.clone();
401            let requester = requester.clone();
402            sync_region_tasks.push(async move {
403                requester
404                    .handle(request)
405                    .await
406                    .map_err(add_peer_context_if_needed(datanode))
407            });
408        }
409    }
410
411    // Failure to sync region is not critical.
412    // We try our best to sync the regions.
413    if let Err(err) = join_all(sync_region_tasks)
414        .await
415        .into_iter()
416        .collect::<Result<Vec<_>>>()
417    {
418        error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
419    }
420    info!("Sync follower regions on datanodes, table_id: {}", table_id);
421
422    Ok(())
423}
424
425/// Extracts column metadatas from extensions.
426pub fn extract_column_metadatas(
427    results: &mut [RegionResponse],
428    key: &str,
429) -> Result<Option<Vec<ColumnMetadata>>> {
430    let mut schemas = results
431        .iter_mut()
432        .map(|r| r.extensions.remove(key))
433        .collect::<Vec<_>>();
434
435    if schemas.is_empty() {
436        warn!("extract_column_metadatas: no extension key `{key}` found in results");
437        return Ok(None);
438    }
439
440    // Verify all the physical schemas are the same
441    // Safety: previous check ensures this vec is not empty
442    let first_column_metadatas = schemas
443        .swap_remove(0)
444        .map(|first_bytes| ColumnMetadata::decode_list(&first_bytes).context(DecodeJsonSnafu))
445        .transpose()?;
446
447    for s in schemas {
448        // check decoded column metadata instead of bytes because it contains extension map.
449        let column_metadata = s
450            .map(|bytes| ColumnMetadata::decode_list(&bytes).context(DecodeJsonSnafu))
451            .transpose()?;
452        ensure!(
453            column_metadata == first_column_metadatas,
454            MetadataCorruptionSnafu {
455                err_msg: format!(
456                    "The table column metadata schemas from datanodes are not the same. First: {:?}, Current: {:?}",
457                    first_column_metadatas, column_metadata,
458                ),
459            }
460        );
461    }
462    Ok(first_column_metadatas)
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468
469    #[test]
470    fn test_get_catalog_and_schema() {
471        let test_catalog = "my_catalog";
472        let test_schema = "my_schema";
473        let path = region_storage_path(test_catalog, test_schema);
474        let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
475        assert_eq!(catalog, test_catalog);
476        assert_eq!(schema, test_schema);
477    }
478}