common_meta/ddl/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod raw_table_info;
16#[allow(dead_code)]
17pub(crate) mod region_metadata_lister;
18pub(crate) mod table_id;
19pub(crate) mod table_info;
20
21use std::collections::HashMap;
22use std::fmt::Debug;
23
24use api::region::RegionResponse;
25use api::v1::region::sync_request::ManifestInfo;
26use api::v1::region::{
27    region_request, MetricManifestInfo, MitoManifestInfo, RegionRequest, RegionRequestHeader,
28    SyncRequest,
29};
30use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE};
31use common_error::ext::BoxedError;
32use common_procedure::error::Error as ProcedureError;
33use common_telemetry::tracing_context::TracingContext;
34use common_telemetry::{error, info, warn};
35use common_wal::options::WalOptions;
36use futures::future::join_all;
37use snafu::{ensure, OptionExt, ResultExt};
38use store_api::metadata::ColumnMetadata;
39use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY};
40use store_api::region_engine::RegionManifestInfo;
41use store_api::storage::{RegionId, RegionNumber};
42use table::metadata::TableId;
43use table::table_reference::TableReference;
44
45use crate::ddl::{DdlContext, DetectingRegion};
46use crate::error::{
47    self, DecodeJsonSnafu, Error, MetadataCorruptionSnafu, OperateDatanodeSnafu,
48    ParseWalOptionsSnafu, Result, TableNotFoundSnafu, UnsupportedSnafu,
49};
50use crate::key::datanode_table::DatanodeTableValue;
51use crate::key::table_name::TableNameKey;
52use crate::key::table_route::TableRouteValue;
53use crate::key::{TableMetadataManager, TableMetadataManagerRef};
54use crate::peer::Peer;
55use crate::rpc::ddl::CreateTableTask;
56use crate::rpc::router::{find_follower_regions, find_followers, RegionRoute};
57
58/// Adds [Peer] context if the error is unretryable.
59pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
60    move |err| {
61        error!(err; "Failed to operate datanode, peer: {}", datanode);
62        if !err.is_retry_later() {
63            return Err::<(), BoxedError>(BoxedError::new(err))
64                .context(OperateDatanodeSnafu { peer: datanode })
65                .unwrap_err();
66        }
67        err
68    }
69}
70
71/// Maps the error to the corresponding procedure error.
72///
73/// This function determines whether the error should be retried and if poison cleanup is needed,
74/// then maps it to the appropriate procedure error variant.
75pub fn map_to_procedure_error(e: Error) -> ProcedureError {
76    match (e.is_retry_later(), e.need_clean_poisons()) {
77        (true, true) => ProcedureError::retry_later_and_clean_poisons(e),
78        (true, false) => ProcedureError::retry_later(e),
79        (false, true) => ProcedureError::external_and_clean_poisons(e),
80        (false, false) => ProcedureError::external(e),
81    }
82}
83
84#[inline]
85pub fn region_storage_path(catalog: &str, schema: &str) -> String {
86    format!("{}/{}", catalog, schema)
87}
88
89/// Extracts catalog and schema from the path that created by [region_storage_path].
90pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
91    let mut split = path.split('/');
92    Some((split.next()?.to_string(), split.next()?.to_string()))
93}
94
95pub async fn check_and_get_physical_table_id(
96    table_metadata_manager: &TableMetadataManagerRef,
97    tasks: &[CreateTableTask],
98) -> Result<TableId> {
99    let mut physical_table_name = None;
100    for task in tasks {
101        ensure!(
102            task.create_table.engine == METRIC_ENGINE,
103            UnsupportedSnafu {
104                operation: format!("create table with engine {}", task.create_table.engine)
105            }
106        );
107        let current_physical_table_name = task
108            .create_table
109            .table_options
110            .get(LOGICAL_TABLE_METADATA_KEY)
111            .context(UnsupportedSnafu {
112                operation: format!(
113                    "create table without table options {}",
114                    LOGICAL_TABLE_METADATA_KEY,
115                ),
116            })?;
117        let current_physical_table_name = TableNameKey::new(
118            &task.create_table.catalog_name,
119            &task.create_table.schema_name,
120            current_physical_table_name,
121        );
122
123        physical_table_name = match physical_table_name {
124            Some(name) => {
125                ensure!(
126                    name == current_physical_table_name,
127                    UnsupportedSnafu {
128                        operation: format!(
129                            "create table with different physical table name {} and {}",
130                            name, current_physical_table_name
131                        )
132                    }
133                );
134                Some(name)
135            }
136            None => Some(current_physical_table_name),
137        };
138    }
139    // Safety: `physical_table_name` is `Some` here
140    let physical_table_name = physical_table_name.unwrap();
141    table_metadata_manager
142        .table_name_manager()
143        .get(physical_table_name)
144        .await?
145        .with_context(|| TableNotFoundSnafu {
146            table_name: TableReference::from(physical_table_name).to_string(),
147        })
148        .map(|table| table.table_id())
149}
150
151pub async fn get_physical_table_id(
152    table_metadata_manager: &TableMetadataManagerRef,
153    logical_table_name: TableNameKey<'_>,
154) -> Result<TableId> {
155    let logical_table_id = table_metadata_manager
156        .table_name_manager()
157        .get(logical_table_name)
158        .await?
159        .with_context(|| TableNotFoundSnafu {
160            table_name: TableReference::from(logical_table_name).to_string(),
161        })
162        .map(|table| table.table_id())?;
163
164    table_metadata_manager
165        .table_route_manager()
166        .get_physical_table_id(logical_table_id)
167        .await
168}
169
170/// Converts a list of [`RegionRoute`] to a list of [`DetectingRegion`].
171pub fn convert_region_routes_to_detecting_regions(
172    region_routes: &[RegionRoute],
173) -> Vec<DetectingRegion> {
174    region_routes
175        .iter()
176        .flat_map(|route| {
177            route
178                .leader_peer
179                .as_ref()
180                .map(|peer| (peer.id, route.region.id))
181        })
182        .collect::<Vec<_>>()
183}
184
185/// Parses [WalOptions] from serialized strings in hashmap.
186pub fn parse_region_wal_options(
187    serialized_options: &HashMap<RegionNumber, String>,
188) -> Result<HashMap<RegionNumber, WalOptions>> {
189    let mut region_wal_options = HashMap::with_capacity(serialized_options.len());
190    for (region_number, wal_options) in serialized_options {
191        let wal_option = serde_json::from_str::<WalOptions>(wal_options)
192            .context(ParseWalOptionsSnafu { wal_options })?;
193        region_wal_options.insert(*region_number, wal_option);
194    }
195    Ok(region_wal_options)
196}
197
198/// Gets the wal options for a table.
199pub async fn get_region_wal_options(
200    table_metadata_manager: &TableMetadataManager,
201    table_route_value: &TableRouteValue,
202    physical_table_id: TableId,
203) -> Result<HashMap<RegionNumber, WalOptions>> {
204    let region_wal_options =
205        if let TableRouteValue::Physical(table_route_value) = &table_route_value {
206            let datanode_table_values = table_metadata_manager
207                .datanode_table_manager()
208                .regions(physical_table_id, table_route_value)
209                .await?;
210            extract_region_wal_options(&datanode_table_values)?
211        } else {
212            HashMap::new()
213        };
214    Ok(region_wal_options)
215}
216
217/// Extracts region wal options from [DatanodeTableValue]s.
218pub fn extract_region_wal_options(
219    datanode_table_values: &Vec<DatanodeTableValue>,
220) -> Result<HashMap<RegionNumber, WalOptions>> {
221    let mut region_wal_options = HashMap::new();
222    for value in datanode_table_values {
223        let serialized_options = &value.region_info.region_wal_options;
224        let parsed_options = parse_region_wal_options(serialized_options)?;
225        region_wal_options.extend(parsed_options);
226    }
227    Ok(region_wal_options)
228}
229
230/// The result of multiple operations.
231///
232/// - Ok: all operations are successful.
233/// - PartialRetryable: if any operation is retryable and without non retryable error, the result is retryable.
234/// - PartialNonRetryable: if any operation is non retryable, the result is non retryable.
235/// - AllRetryable: all operations are retryable.
236/// - AllNonRetryable: all operations are not retryable.
237pub enum MultipleResults<T> {
238    Ok(Vec<T>),
239    PartialRetryable(Error),
240    PartialNonRetryable(Error),
241    AllRetryable(Error),
242    AllNonRetryable(Error),
243}
244
245/// Handles the results of alter region requests.
246///
247/// For partial success, we need to check if the errors are retryable.
248/// If all the errors are retryable, we return a retryable error.
249/// Otherwise, we return the first error.
250pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults<T> {
251    if results.is_empty() {
252        return MultipleResults::Ok(Vec::new());
253    }
254    let num_results = results.len();
255    let mut retryable_results = Vec::new();
256    let mut non_retryable_results = Vec::new();
257    let mut ok_results = Vec::new();
258
259    for result in results {
260        match result {
261            Ok(value) => ok_results.push(value),
262            Err(err) => {
263                if err.is_retry_later() {
264                    retryable_results.push(err);
265                } else {
266                    non_retryable_results.push(err);
267                }
268            }
269        }
270    }
271
272    common_telemetry::debug!(
273        "retryable_results: {}, non_retryable_results: {}, ok_results: {}",
274        retryable_results.len(),
275        non_retryable_results.len(),
276        ok_results.len()
277    );
278
279    if retryable_results.len() == num_results {
280        return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
281    } else if non_retryable_results.len() == num_results {
282        warn!("all non retryable results: {}", non_retryable_results.len());
283        for err in &non_retryable_results {
284            error!(err; "non retryable error");
285        }
286        return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
287    } else if ok_results.len() == num_results {
288        return MultipleResults::Ok(ok_results);
289    } else if !retryable_results.is_empty()
290        && !ok_results.is_empty()
291        && non_retryable_results.is_empty()
292    {
293        return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
294    }
295
296    warn!(
297        "partial non retryable results: {}, retryable results: {}, ok results: {}",
298        non_retryable_results.len(),
299        retryable_results.len(),
300        ok_results.len()
301    );
302    for err in &non_retryable_results {
303        error!(err; "non retryable error");
304    }
305    // non_retryable_results.len() > 0
306    MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
307}
308
309/// Parses manifest infos from extensions.
310pub fn parse_manifest_infos_from_extensions(
311    extensions: &HashMap<String, Vec<u8>>,
312) -> Result<Vec<(RegionId, RegionManifestInfo)>> {
313    let data_manifest_version =
314        extensions
315            .get(MANIFEST_INFO_EXTENSION_KEY)
316            .context(error::UnexpectedSnafu {
317                err_msg: "manifest info extension not found",
318            })?;
319    let data_manifest_version =
320        RegionManifestInfo::decode_list(data_manifest_version).context(error::SerdeJsonSnafu {})?;
321    Ok(data_manifest_version)
322}
323
324/// Parses column metadatas from extensions.
325pub fn parse_column_metadatas(
326    extensions: &HashMap<String, Vec<u8>>,
327    key: &str,
328) -> Result<Vec<ColumnMetadata>> {
329    let value = extensions.get(key).context(error::UnexpectedSnafu {
330        err_msg: format!("column metadata extension not found: {}", key),
331    })?;
332    let column_metadatas = ColumnMetadata::decode_list(value).context(error::SerdeJsonSnafu {})?;
333    Ok(column_metadatas)
334}
335
336/// Sync follower regions on datanodes.
337pub async fn sync_follower_regions(
338    context: &DdlContext,
339    table_id: TableId,
340    results: &[RegionResponse],
341    region_routes: &[RegionRoute],
342    engine: &str,
343) -> Result<()> {
344    if engine != MITO_ENGINE && engine != METRIC_ENGINE {
345        info!(
346            "Skip submitting sync region requests for table_id: {}, engine: {}",
347            table_id, engine
348        );
349        return Ok(());
350    }
351
352    let results = results
353        .iter()
354        .map(|response| parse_manifest_infos_from_extensions(&response.extensions))
355        .collect::<Result<Vec<_>>>()?
356        .into_iter()
357        .flatten()
358        .collect::<HashMap<_, _>>();
359
360    let is_mito_engine = engine == MITO_ENGINE;
361
362    let followers = find_followers(region_routes);
363    if followers.is_empty() {
364        return Ok(());
365    }
366    let mut sync_region_tasks = Vec::with_capacity(followers.len());
367    for datanode in followers {
368        let requester = context.node_manager.datanode(&datanode).await;
369        let regions = find_follower_regions(region_routes, &datanode);
370        for region in regions {
371            let region_id = RegionId::new(table_id, region);
372            let manifest_info = if is_mito_engine {
373                let region_manifest_info =
374                    results.get(&region_id).context(error::UnexpectedSnafu {
375                        err_msg: format!("No manifest info found for region {}", region_id),
376                    })?;
377                ensure!(
378                    region_manifest_info.is_mito(),
379                    error::UnexpectedSnafu {
380                        err_msg: format!("Region {} is not a mito region", region_id)
381                    }
382                );
383                ManifestInfo::MitoManifestInfo(MitoManifestInfo {
384                    data_manifest_version: region_manifest_info.data_manifest_version(),
385                })
386            } else {
387                let region_manifest_info =
388                    results.get(&region_id).context(error::UnexpectedSnafu {
389                        err_msg: format!("No manifest info found for region {}", region_id),
390                    })?;
391                ensure!(
392                    region_manifest_info.is_metric(),
393                    error::UnexpectedSnafu {
394                        err_msg: format!("Region {} is not a metric region", region_id)
395                    }
396                );
397                ManifestInfo::MetricManifestInfo(MetricManifestInfo {
398                    data_manifest_version: region_manifest_info.data_manifest_version(),
399                    metadata_manifest_version: region_manifest_info
400                        .metadata_manifest_version()
401                        .unwrap_or_default(),
402                })
403            };
404            let request = RegionRequest {
405                header: Some(RegionRequestHeader {
406                    tracing_context: TracingContext::from_current_span().to_w3c(),
407                    ..Default::default()
408                }),
409                body: Some(region_request::Body::Sync(SyncRequest {
410                    region_id: region_id.as_u64(),
411                    manifest_info: Some(manifest_info),
412                })),
413            };
414
415            let datanode = datanode.clone();
416            let requester = requester.clone();
417            sync_region_tasks.push(async move {
418                requester
419                    .handle(request)
420                    .await
421                    .map_err(add_peer_context_if_needed(datanode))
422            });
423        }
424    }
425
426    // Failure to sync region is not critical.
427    // We try our best to sync the regions.
428    if let Err(err) = join_all(sync_region_tasks)
429        .await
430        .into_iter()
431        .collect::<Result<Vec<_>>>()
432    {
433        error!(err; "Failed to sync follower regions on datanodes, table_id: {}", table_id);
434    }
435    info!("Sync follower regions on datanodes, table_id: {}", table_id);
436
437    Ok(())
438}
439
440/// Extracts column metadatas from extensions.
441pub fn extract_column_metadatas(
442    results: &mut [RegionResponse],
443    key: &str,
444) -> Result<Option<Vec<ColumnMetadata>>> {
445    let schemas = results
446        .iter_mut()
447        .map(|r| r.extensions.remove(key))
448        .collect::<Vec<_>>();
449
450    if schemas.is_empty() {
451        warn!("extract_column_metadatas: no extension key `{key}` found in results");
452        return Ok(None);
453    }
454
455    // Verify all the physical schemas are the same
456    // Safety: previous check ensures this vec is not empty
457    let first = schemas.first().unwrap();
458    ensure!(
459        schemas.iter().all(|x| x == first),
460        MetadataCorruptionSnafu {
461            err_msg: "The table column metadata schemas from datanodes are not the same."
462        }
463    );
464
465    if let Some(first) = first {
466        let column_metadatas = ColumnMetadata::decode_list(first).context(DecodeJsonSnafu)?;
467        Ok(Some(column_metadatas))
468    } else {
469        Ok(None)
470    }
471}
472
473#[cfg(test)]
474mod tests {
475    use super::*;
476
477    #[test]
478    fn test_get_catalog_and_schema() {
479        let test_catalog = "my_catalog";
480        let test_schema = "my_schema";
481        let path = region_storage_path(test_catalog, test_schema);
482        let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
483        assert_eq!(catalog, test_catalog);
484        assert_eq!(schema, test_schema);
485    }
486}