common_meta/ddl/utils/
table_info.rs1use itertools::Itertools;
16use snafu::OptionExt;
17use store_api::storage::TableId;
18use table::metadata::RawTableInfo;
19use table::table_reference::TableReference;
20
21use crate::error::{Result, TableInfoNotFoundSnafu};
22use crate::key::table_info::{TableInfoManager, TableInfoValue};
23use crate::key::table_route::{TableRouteManager, TableRouteValue};
24use crate::key::{DeserializedValueWithBytes, TableMetadataManager};
25
26pub(crate) async fn get_all_table_info_values_by_table_ids<'a>(
30    table_info_manager: &TableInfoManager,
31    table_ids: &[TableId],
32    table_names: &[TableReference<'a>],
33) -> Result<Vec<DeserializedValueWithBytes<TableInfoValue>>> {
34    let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?;
35    let mut table_info_values = Vec::with_capacity(table_ids.len());
36    for (table_id, table_name) in table_ids.iter().zip(table_names) {
37        let table_info_value =
38            table_info_map
39                .remove(table_id)
40                .with_context(|| TableInfoNotFoundSnafu {
41                    table: table_name.to_string(),
42                })?;
43        table_info_values.push(table_info_value);
44    }
45
46    Ok(table_info_values)
47}
48
49pub(crate) async fn all_logical_table_routes_have_same_physical_id(
51    table_route_manager: &TableRouteManager,
52    table_ids: &[TableId],
53    physical_table_id: TableId,
54) -> Result<bool> {
55    let table_routes = table_route_manager
56        .table_route_storage()
57        .batch_get(table_ids)
58        .await?;
59
60    let is_same_physical_table = table_routes.iter().all(|r| {
61        if let Some(TableRouteValue::Logical(r)) = r {
62            r.physical_table_id() == physical_table_id
63        } else {
64            false
65        }
66    });
67
68    Ok(is_same_physical_table)
69}
70
71pub(crate) async fn batch_update_table_info_values(
77    table_metadata_manager: &TableMetadataManager,
78    table_info_values: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
79) -> Result<()> {
80    let chunk_size = table_metadata_manager.batch_update_table_info_value_chunk_size();
81    if table_info_values.len() > chunk_size {
82        let chunks = table_info_values
83            .into_iter()
84            .chunks(chunk_size)
85            .into_iter()
86            .map(|check| check.collect::<Vec<_>>())
87            .collect::<Vec<_>>();
88        for chunk in chunks {
89            table_metadata_manager
90                .batch_update_table_info_values(chunk)
91                .await?;
92        }
93    } else {
94        table_metadata_manager
95            .batch_update_table_info_values(table_info_values)
96            .await?;
97    }
98
99    Ok(())
100}