common_meta/ddl/alter_logical_tables/
update_metadata.rs1use common_grpc_expr::alter_expr_to_request;
16use common_telemetry::warn;
17use itertools::Itertools;
18use snafu::ResultExt;
19use table::metadata::{RawTableInfo, TableInfo};
20
21use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
22use crate::ddl::physical_table_metadata;
23use crate::error;
24use crate::error::{ConvertAlterTableRequestSnafu, Result};
25use crate::key::table_info::TableInfoValue;
26use crate::key::DeserializedValueWithBytes;
27use crate::rpc::ddl::AlterTableTask;
28
29impl AlterLogicalTablesProcedure {
30 pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
31 if self.data.physical_columns.is_empty() {
32 warn!("No physical columns found, leaving the physical table's schema unchanged when altering logical tables");
33 return Ok(());
34 }
35
36 let physical_table_info = self.data.physical_table_info.as_ref().unwrap();
38
39 let old_raw_table_info = physical_table_info.table_info.clone();
41 let new_raw_table_info = physical_table_metadata::build_new_physical_table_info(
42 old_raw_table_info,
43 &self.data.physical_columns,
44 );
45
46 self.context
48 .table_metadata_manager
49 .update_table_info(physical_table_info, None, new_raw_table_info)
50 .await?;
51
52 Ok(())
53 }
54
55 pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> {
56 let table_info_values = self.build_update_metadata()?;
57 let manager = &self.context.table_metadata_manager;
58 let chunk_size = manager.batch_update_table_info_value_chunk_size();
59 if table_info_values.len() > chunk_size {
60 let chunks = table_info_values
61 .into_iter()
62 .chunks(chunk_size)
63 .into_iter()
64 .map(|check| check.collect::<Vec<_>>())
65 .collect::<Vec<_>>();
66 for chunk in chunks {
67 manager.batch_update_table_info_values(chunk).await?;
68 }
69 } else {
70 manager
71 .batch_update_table_info_values(table_info_values)
72 .await?;
73 }
74
75 Ok(())
76 }
77
78 pub(crate) fn build_update_metadata(
79 &self,
80 ) -> Result<Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>> {
81 let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len());
82 for (task, table) in self
83 .data
84 .tasks
85 .iter()
86 .zip(self.data.table_info_values.iter())
87 {
88 table_info_values_to_update.push(self.build_new_table_info(task, table)?);
89 }
90
91 Ok(table_info_values_to_update)
92 }
93
94 fn build_new_table_info(
95 &self,
96 task: &AlterTableTask,
97 table: &DeserializedValueWithBytes<TableInfoValue>,
98 ) -> Result<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)> {
99 let table_info = TableInfo::try_from(table.table_info.clone())
101 .context(error::ConvertRawTableInfoSnafu)?;
102 let table_ref = task.table_ref();
103 let request =
104 alter_expr_to_request(table.table_info.ident.table_id, task.alter_table.clone())
105 .context(ConvertAlterTableRequestSnafu)?;
106 let new_meta = table_info
107 .meta
108 .builder_with_alter_kind(table_ref.table, &request.alter_kind)
109 .context(error::TableSnafu)?
110 .build()
111 .with_context(|_| error::BuildTableMetaSnafu {
112 table_name: table_ref.table,
113 })?;
114 let version = table_info.ident.version + 1;
115 let mut new_table = table_info;
116 new_table.meta = new_meta;
117 new_table.ident.version = version;
118
119 let mut raw_table_info = RawTableInfo::from(new_table);
120 raw_table_info.sort_columns();
121
122 Ok((table.clone(), raw_table_info))
123 }
124}