common_meta/ddl/utils/
table_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use snafu::OptionExt;
17use store_api::storage::TableId;
18use table::metadata::RawTableInfo;
19use table::table_reference::TableReference;
20
21use crate::error::{Result, TableInfoNotFoundSnafu};
22use crate::key::table_info::{TableInfoManager, TableInfoValue};
23use crate::key::table_route::{TableRouteManager, TableRouteValue};
24use crate::key::{DeserializedValueWithBytes, TableMetadataManager};
25
26/// Get all table info values by table ids.
27///
28/// Returns an error if any table does not exist.
29pub(crate) async fn get_all_table_info_values_by_table_ids<'a>(
30    table_info_manager: &TableInfoManager,
31    table_ids: &[TableId],
32    table_names: &[TableReference<'a>],
33) -> Result<Vec<DeserializedValueWithBytes<TableInfoValue>>> {
34    let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?;
35    let mut table_info_values = Vec::with_capacity(table_ids.len());
36    for (table_id, table_name) in table_ids.iter().zip(table_names) {
37        let table_info_value =
38            table_info_map
39                .remove(table_id)
40                .with_context(|| TableInfoNotFoundSnafu {
41                    table: table_name.to_string(),
42                })?;
43        table_info_values.push(table_info_value);
44    }
45
46    Ok(table_info_values)
47}
48
49/// Checks if all the logical table routes have the same physical table id.
50pub(crate) async fn all_logical_table_routes_have_same_physical_id(
51    table_route_manager: &TableRouteManager,
52    table_ids: &[TableId],
53    physical_table_id: TableId,
54) -> Result<bool> {
55    let table_routes = table_route_manager
56        .table_route_storage()
57        .batch_get(table_ids)
58        .await?;
59
60    let is_same_physical_table = table_routes.iter().all(|r| {
61        if let Some(TableRouteValue::Logical(r)) = r {
62            r.physical_table_id() == physical_table_id
63        } else {
64            false
65        }
66    });
67
68    Ok(is_same_physical_table)
69}
70
71/// Batch updates the table info values.
72///
73/// The table info values are grouped into chunks, and each chunk is updated in a single transaction.
74///
75/// Returns an error if any table info value fails to update.
76pub(crate) async fn batch_update_table_info_values(
77    table_metadata_manager: &TableMetadataManager,
78    table_info_values: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
79) -> Result<()> {
80    let chunk_size = table_metadata_manager.batch_update_table_info_value_chunk_size();
81    if table_info_values.len() > chunk_size {
82        let chunks = table_info_values
83            .into_iter()
84            .chunks(chunk_size)
85            .into_iter()
86            .map(|check| check.collect::<Vec<_>>())
87            .collect::<Vec<_>>();
88        for chunk in chunks {
89            table_metadata_manager
90                .batch_update_table_info_values(chunk)
91                .await?;
92        }
93    } else {
94        table_metadata_manager
95            .batch_update_table_info_values(table_info_values)
96            .await?;
97    }
98
99    Ok(())
100}