common_meta/ddl/alter_logical_tables/
metadata.rs1use common_catalog::format_full_table_name;
16use snafu::OptionExt;
17use table::metadata::TableId;
18
19use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
20use crate::error::{
21 AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu, TableNotFoundSnafu,
22 TableRouteNotFoundSnafu,
23};
24use crate::key::table_info::TableInfoValue;
25use crate::key::table_name::TableNameKey;
26use crate::key::table_route::TableRouteValue;
27use crate::key::DeserializedValueWithBytes;
28use crate::rpc::ddl::AlterTableTask;
29
30impl AlterLogicalTablesProcedure {
31 pub(crate) fn filter_task(&mut self, finished_tasks: &[bool]) -> Result<()> {
32 debug_assert_eq!(finished_tasks.len(), self.data.tasks.len());
33 debug_assert_eq!(finished_tasks.len(), self.data.table_info_values.len());
34 self.data.tasks = self
35 .data
36 .tasks
37 .drain(..)
38 .zip(finished_tasks.iter())
39 .filter_map(|(task, finished)| if *finished { None } else { Some(task) })
40 .collect();
41 self.data.table_info_values = self
42 .data
43 .table_info_values
44 .drain(..)
45 .zip(finished_tasks.iter())
46 .filter_map(|(table_info_value, finished)| {
47 if *finished {
48 None
49 } else {
50 Some(table_info_value)
51 }
52 })
53 .collect();
54
55 Ok(())
56 }
57
58 pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> {
59 let (physical_table_info, physical_table_route) = self
60 .context
61 .table_metadata_manager
62 .get_full_table_info(self.data.physical_table_id)
63 .await?;
64
65 let physical_table_info = physical_table_info.with_context(|| TableInfoNotFoundSnafu {
66 table: format!("table id - {}", self.data.physical_table_id),
67 })?;
68 let physical_table_route = physical_table_route
69 .context(TableRouteNotFoundSnafu {
70 table_id: self.data.physical_table_id,
71 })?
72 .into_inner();
73
74 self.data.physical_table_info = Some(physical_table_info);
75 let TableRouteValue::Physical(physical_table_route) = physical_table_route else {
76 return AlterLogicalTablesInvalidArgumentsSnafu {
77 err_msg: format!(
78 "expected a physical table but got a logical table: {:?}",
79 self.data.physical_table_id
80 ),
81 }
82 .fail();
83 };
84 self.data.physical_table_route = Some(physical_table_route);
85
86 Ok(())
87 }
88
89 pub(crate) async fn fill_table_info_values(&mut self) -> Result<()> {
90 let table_ids = self.get_all_table_ids().await?;
91 let table_info_values = self.get_all_table_info_values(&table_ids).await?;
92 debug_assert_eq!(table_info_values.len(), self.data.tasks.len());
93 self.data.table_info_values = table_info_values;
94
95 Ok(())
96 }
97
98 async fn get_all_table_info_values(
99 &self,
100 table_ids: &[TableId],
101 ) -> Result<Vec<DeserializedValueWithBytes<TableInfoValue>>> {
102 let table_info_manager = self.context.table_metadata_manager.table_info_manager();
103 let mut table_info_map = table_info_manager.batch_get_raw(table_ids).await?;
104 let mut table_info_values = Vec::with_capacity(table_ids.len());
105 for (table_id, task) in table_ids.iter().zip(self.data.tasks.iter()) {
106 let table_info_value =
107 table_info_map
108 .remove(table_id)
109 .with_context(|| TableInfoNotFoundSnafu {
110 table: extract_table_name(task),
111 })?;
112 table_info_values.push(table_info_value);
113 }
114
115 Ok(table_info_values)
116 }
117
118 async fn get_all_table_ids(&self) -> Result<Vec<TableId>> {
119 let table_name_manager = self.context.table_metadata_manager.table_name_manager();
120 let table_name_keys = self
121 .data
122 .tasks
123 .iter()
124 .map(|task| extract_table_name_key(task))
125 .collect();
126
127 let table_name_values = table_name_manager.batch_get(table_name_keys).await?;
128 let mut table_ids = Vec::with_capacity(table_name_values.len());
129 for (value, task) in table_name_values.into_iter().zip(self.data.tasks.iter()) {
130 let table_id = value
131 .with_context(|| TableNotFoundSnafu {
132 table_name: extract_table_name(task),
133 })?
134 .table_id();
135 table_ids.push(table_id);
136 }
137
138 Ok(table_ids)
139 }
140}
141
142#[inline]
143fn extract_table_name(task: &AlterTableTask) -> String {
144 format_full_table_name(
145 &task.alter_table.catalog_name,
146 &task.alter_table.schema_name,
147 &task.alter_table.table_name,
148 )
149}
150
151#[inline]
152fn extract_table_name_key(task: &AlterTableTask) -> TableNameKey {
153 TableNameKey::new(
154 &task.alter_table.catalog_name,
155 &task.alter_table.schema_name,
156 &task.alter_table.table_name,
157 )
158}