common_meta/ddl/utils/
table_info.rs1use itertools::Itertools;
16use snafu::OptionExt;
17use store_api::storage::TableId;
18use table::metadata::RawTableInfo;
19use table::table_reference::TableReference;
20
21use crate::error::{Result, TableInfoNotFoundSnafu};
22use crate::key::table_info::{TableInfoManager, TableInfoValue};
23use crate::key::table_route::{TableRouteManager, TableRouteValue};
24use crate::key::{DeserializedValueWithBytes, TableMetadataManager};
25
26pub(crate) async fn get_all_table_info_values_by_table_ids<'a>(
30 table_info_manager: &TableInfoManager,
31 table_ids: &[TableId],
32 table_names: &[TableReference<'a>],
33) -> Result<Vec<DeserializedValueWithBytes<TableInfoValue>>> {
34 let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?;
35 let mut table_info_values = Vec::with_capacity(table_ids.len());
36 for (table_id, table_name) in table_ids.iter().zip(table_names) {
37 let table_info_value =
38 table_info_map
39 .remove(table_id)
40 .with_context(|| TableInfoNotFoundSnafu {
41 table: table_name.to_string(),
42 })?;
43 table_info_values.push(table_info_value);
44 }
45
46 Ok(table_info_values)
47}
48
49pub(crate) async fn all_logical_table_routes_have_same_physical_id(
51 table_route_manager: &TableRouteManager,
52 table_ids: &[TableId],
53 physical_table_id: TableId,
54) -> Result<bool> {
55 let table_routes = table_route_manager
56 .table_route_storage()
57 .batch_get(table_ids)
58 .await?;
59
60 let is_same_physical_table = table_routes.iter().all(|r| {
61 if let Some(TableRouteValue::Logical(r)) = r {
62 r.physical_table_id() == physical_table_id
63 } else {
64 false
65 }
66 });
67
68 Ok(is_same_physical_table)
69}
70
71pub(crate) async fn batch_update_table_info_values(
77 table_metadata_manager: &TableMetadataManager,
78 table_info_values: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
79) -> Result<()> {
80 let chunk_size = table_metadata_manager.batch_update_table_info_value_chunk_size();
81 if table_info_values.len() > chunk_size {
82 let chunks = table_info_values
83 .into_iter()
84 .chunks(chunk_size)
85 .into_iter()
86 .map(|check| check.collect::<Vec<_>>())
87 .collect::<Vec<_>>();
88 for chunk in chunks {
89 table_metadata_manager
90 .batch_update_table_info_values(chunk)
91 .await?;
92 }
93 } else {
94 table_metadata_manager
95 .batch_update_table_info_values(table_info_values)
96 .await?;
97 }
98
99 Ok(())
100}