common_meta/ddl/alter_logical_tables/
update_metadata.rs1use common_grpc_expr::alter_expr_to_request;
16use snafu::ResultExt;
17use table::metadata::{RawTableInfo, TableInfo};
18
19use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
20use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
21use crate::ddl::utils::table_info::batch_update_table_info_values;
22use crate::error;
23use crate::error::{ConvertAlterTableRequestSnafu, Result};
24use crate::key::table_info::TableInfoValue;
25use crate::key::DeserializedValueWithBytes;
26use crate::rpc::ddl::AlterTableTask;
27use crate::rpc::router::region_distribution;
28
29impl AlterLogicalTablesProcedure {
30 pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
31 let physical_table_info = self.data.physical_table_info.as_ref().unwrap();
33 let physical_table_route = self.data.physical_table_route.as_ref().unwrap();
34 let region_distribution = region_distribution(&physical_table_route.region_routes);
35
36 AlterLogicalTablesExecutor::on_alter_metadata(
38 self.data.physical_table_id,
39 &self.context.table_metadata_manager,
40 physical_table_info,
41 region_distribution,
42 &self.data.physical_columns,
43 )
44 .await?;
45
46 Ok(())
47 }
48
49 pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> {
50 let table_info_values = self.build_update_metadata()?;
51 batch_update_table_info_values(&self.context.table_metadata_manager, table_info_values)
52 .await
53 }
54
55 pub(crate) fn build_update_metadata(
56 &self,
57 ) -> Result<Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>> {
58 let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len());
59 for (task, table) in self
60 .data
61 .tasks
62 .iter()
63 .zip(self.data.table_info_values.iter())
64 {
65 table_info_values_to_update.push(self.build_new_table_info(task, table)?);
66 }
67
68 Ok(table_info_values_to_update)
69 }
70
71 fn build_new_table_info(
72 &self,
73 task: &AlterTableTask,
74 table: &DeserializedValueWithBytes<TableInfoValue>,
75 ) -> Result<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)> {
76 let table_info = TableInfo::try_from(table.table_info.clone())
78 .context(error::ConvertRawTableInfoSnafu)?;
79 let table_ref = task.table_ref();
80 let request = alter_expr_to_request(
81 table.table_info.ident.table_id,
82 task.alter_table.clone(),
83 Some(&table_info.meta),
84 )
85 .context(ConvertAlterTableRequestSnafu)?;
86 let new_meta = table_info
87 .meta
88 .builder_with_alter_kind(table_ref.table, &request.alter_kind)
89 .context(error::TableSnafu)?
90 .build()
91 .with_context(|_| error::BuildTableMetaSnafu {
92 table_name: table_ref.table,
93 })?;
94 let version = table_info.ident.version + 1;
95 let mut new_table = table_info;
96 new_table.meta = new_meta;
97 new_table.ident.version = version;
98
99 let mut raw_table_info = RawTableInfo::from(new_table);
100 raw_table_info.sort_columns();
101
102 Ok((table.clone(), raw_table_info))
103 }
104}