common_meta/ddl/create_logical_tables/
update_metadata.rs1use std::ops::Deref;
16
17use common_telemetry::{info, warn};
18use itertools::Itertools;
19use snafu::OptionExt;
20use table::metadata::TableId;
21use table::table_name::TableName;
22
23use crate::cache_invalidator::Context;
24use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
25use crate::ddl::physical_table_metadata;
26use crate::error::{Result, TableInfoNotFoundSnafu};
27use crate::instruction::CacheIdent;
28
29impl CreateLogicalTablesProcedure {
30 pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
31 if self.data.physical_columns.is_empty() {
32 warn!("No physical columns found, leaving the physical table's schema unchanged when creating logical tables");
33 return Ok(());
34 }
35
36 let physical_table_info = self
38 .context
39 .table_metadata_manager
40 .table_info_manager()
41 .get(self.data.physical_table_id)
42 .await?
43 .with_context(|| TableInfoNotFoundSnafu {
44 table: format!("table id - {}", self.data.physical_table_id),
45 })?;
46
47 let raw_table_info = physical_table_info.deref().table_info.clone();
49
50 let new_table_info = physical_table_metadata::build_new_physical_table_info(
51 raw_table_info,
52 &self.data.physical_columns,
53 );
54
55 let physical_table_name = TableName::new(
56 &new_table_info.catalog_name,
57 &new_table_info.schema_name,
58 &new_table_info.name,
59 );
60
61 self.context
63 .table_metadata_manager
64 .update_table_info(&physical_table_info, None, new_table_info)
65 .await?;
66
67 self.context
69 .cache_invalidator
70 .invalidate(
71 &Context::default(),
72 &[
73 CacheIdent::TableId(self.data.physical_table_id),
74 CacheIdent::TableName(physical_table_name),
75 ],
76 )
77 .await?;
78
79 Ok(())
80 }
81
82 pub(crate) async fn create_logical_tables_metadata(&mut self) -> Result<Vec<TableId>> {
83 let remaining_tasks = self.data.remaining_tasks();
84 let num_tables = remaining_tasks.len();
85
86 if num_tables > 0 {
87 let chunk_size = self
88 .context
89 .table_metadata_manager
90 .create_logical_tables_metadata_chunk_size();
91 if num_tables > chunk_size {
92 let chunks = remaining_tasks
93 .into_iter()
94 .chunks(chunk_size)
95 .into_iter()
96 .map(|chunk| chunk.collect::<Vec<_>>())
97 .collect::<Vec<_>>();
98 for chunk in chunks {
99 self.context
100 .table_metadata_manager
101 .create_logical_tables_metadata(chunk)
102 .await?;
103 }
104 } else {
105 self.context
106 .table_metadata_manager
107 .create_logical_tables_metadata(remaining_tasks)
108 .await?;
109 }
110 }
111
112 let table_ids = self
115 .data
116 .tasks
117 .iter()
118 .map(|task| task.table_info.ident.table_id)
119 .collect::<Vec<_>>();
120
121 info!(
122 "Created {num_tables} tables {table_ids:?} metadata for physical table {}",
123 self.data.physical_table_id
124 );
125
126 Ok(table_ids)
127 }
128}