common_meta/ddl/create_logical_tables/
update_metadata.rs1use std::ops::Deref;
16
17use common_telemetry::{info, warn};
18use itertools::Itertools;
19use snafu::OptionExt;
20use table::metadata::TableId;
21use table::table_name::TableName;
22
23use crate::cache_invalidator::Context;
24use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
25use crate::ddl::utils::raw_table_info;
26use crate::error::{Result, TableInfoNotFoundSnafu};
27use crate::instruction::CacheIdent;
28
29impl CreateLogicalTablesProcedure {
30 pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
31 if self.data.physical_columns.is_empty() {
32 warn!(
33 "No physical columns found, leaving the physical table's schema unchanged when creating logical tables"
34 );
35 return Ok(());
36 }
37
38 let physical_table_info = self
40 .context
41 .table_metadata_manager
42 .table_info_manager()
43 .get(self.data.physical_table_id)
44 .await?
45 .with_context(|| TableInfoNotFoundSnafu {
46 table: format!("table id - {}", self.data.physical_table_id),
47 })?;
48
49 let raw_table_info = physical_table_info.deref().table_info.clone();
51
52 let new_table_info = raw_table_info::build_new_physical_table_info(
53 raw_table_info,
54 &self.data.physical_columns,
55 );
56
57 let physical_table_name = TableName::new(
58 &new_table_info.catalog_name,
59 &new_table_info.schema_name,
60 &new_table_info.name,
61 );
62
63 self.context
65 .table_metadata_manager
66 .update_table_info(&physical_table_info, None, new_table_info)
67 .await?;
68
69 self.context
71 .cache_invalidator
72 .invalidate(
73 &Context::default(),
74 &[
75 CacheIdent::TableId(self.data.physical_table_id),
76 CacheIdent::TableName(physical_table_name),
77 ],
78 )
79 .await?;
80
81 Ok(())
82 }
83
84 pub(crate) async fn create_logical_tables_metadata(&mut self) -> Result<Vec<TableId>> {
85 let remaining_tasks = self.data.remaining_tasks();
86 let num_tables = remaining_tasks.len();
87
88 if num_tables > 0 {
89 let chunk_size = self
90 .context
91 .table_metadata_manager
92 .create_logical_tables_metadata_chunk_size();
93 if num_tables > chunk_size {
94 let chunks = remaining_tasks
95 .into_iter()
96 .chunks(chunk_size)
97 .into_iter()
98 .map(|chunk| chunk.collect::<Vec<_>>())
99 .collect::<Vec<_>>();
100 for chunk in chunks {
101 self.context
102 .table_metadata_manager
103 .create_logical_tables_metadata(chunk)
104 .await?;
105 }
106 } else {
107 self.context
108 .table_metadata_manager
109 .create_logical_tables_metadata(remaining_tasks)
110 .await?;
111 }
112 }
113
114 let table_ids = self
117 .data
118 .tasks
119 .iter()
120 .map(|task| task.table_info.ident.table_id)
121 .collect::<Vec<_>>();
122
123 info!(
124 "Created {num_tables} tables {table_ids:?} metadata for physical table {}",
125 self.data.physical_table_id
126 );
127
128 Ok(table_ids)
129 }
130}