common_meta/ddl/alter_logical_tables/
update_metadata.rs1use common_grpc_expr::alter_expr_to_request;
16use snafu::ResultExt;
17use table::metadata::{RawTableInfo, TableInfo};
18
19use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
20use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
21use crate::ddl::utils::table_info::batch_update_table_info_values;
22use crate::error;
23use crate::error::{ConvertAlterTableRequestSnafu, Result};
24use crate::key::DeserializedValueWithBytes;
25use crate::key::table_info::TableInfoValue;
26use crate::rpc::ddl::AlterTableTask;
27use crate::rpc::router::region_distribution;
28
29impl AlterLogicalTablesProcedure {
30 pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
31 self.fetch_physical_table_route_if_non_exist().await?;
32 let physical_table_info = self.data.physical_table_info.as_ref().unwrap();
34 let physical_table_route = self.physical_table_route.as_ref().unwrap();
36 let region_distribution = region_distribution(&physical_table_route.region_routes);
37
38 AlterLogicalTablesExecutor::on_alter_metadata(
40 self.data.physical_table_id,
41 &self.context.table_metadata_manager,
42 physical_table_info,
43 region_distribution,
44 &self.data.physical_columns,
45 )
46 .await?;
47
48 Ok(())
49 }
50
51 pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> {
52 let table_info_values = self.build_update_metadata()?;
53 batch_update_table_info_values(&self.context.table_metadata_manager, table_info_values)
54 .await
55 }
56
57 pub(crate) fn build_update_metadata(
58 &self,
59 ) -> Result<Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>> {
60 let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len());
61 for (task, table) in self
62 .data
63 .tasks
64 .iter()
65 .zip(self.data.table_info_values.iter())
66 {
67 table_info_values_to_update.push(self.build_new_table_info(task, table)?);
68 }
69
70 Ok(table_info_values_to_update)
71 }
72
73 fn build_new_table_info(
74 &self,
75 task: &AlterTableTask,
76 table: &DeserializedValueWithBytes<TableInfoValue>,
77 ) -> Result<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)> {
78 let table_info = TableInfo::try_from(table.table_info.clone())
80 .context(error::ConvertRawTableInfoSnafu)?;
81 let table_ref = task.table_ref();
82 let request = alter_expr_to_request(
83 table.table_info.ident.table_id,
84 task.alter_table.clone(),
85 Some(&table_info.meta),
86 )
87 .context(ConvertAlterTableRequestSnafu)?;
88 let new_meta = table_info
89 .meta
90 .builder_with_alter_kind(table_ref.table, &request.alter_kind)
91 .context(error::TableSnafu)?
92 .build()
93 .with_context(|_| error::BuildTableMetaSnafu {
94 table_name: table_ref.table,
95 })?;
96 let version = table_info.ident.version + 1;
97 let mut new_table = table_info;
98 new_table.meta = new_meta;
99 new_table.ident.version = version;
100
101 let mut raw_table_info = RawTableInfo::from(new_table);
102 raw_table_info.sort_columns();
103
104 Ok((table.clone(), raw_table_info))
105 }
106}