common_meta/reconciliation/reconcile_logical_tables/
update_table_infos.rs1use std::any::Any;
16use std::collections::HashMap;
17
18use common_procedure::{Context as ProcedureContext, Status};
19use common_telemetry::info;
20use serde::{Deserialize, Serialize};
21use store_api::metadata::ColumnMetadata;
22use store_api::storage::TableId;
23use table::metadata::RawTableInfo;
24use table::table_name::TableName;
25use table::table_reference::TableReference;
26
27use crate::cache_invalidator::Context as CacheContext;
28use crate::ddl::utils::table_info::{
29 batch_update_table_info_values, get_all_table_info_values_by_table_ids,
30};
31use crate::error::Result;
32use crate::instruction::CacheIdent;
33use crate::reconciliation::reconcile_logical_tables::reconciliation_end::ReconciliationEnd;
34use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
35use crate::reconciliation::utils::build_table_meta_from_column_metadatas;
36
37#[derive(Debug, Serialize, Deserialize)]
38pub struct UpdateTableInfos;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for UpdateTableInfos {
43 async fn next(
44 &mut self,
45 ctx: &mut ReconcileLogicalTablesContext,
46 _procedure_ctx: &ProcedureContext,
47 ) -> Result<(Box<dyn State>, Status)> {
48 if ctx.persistent_ctx.update_table_infos.is_empty() {
49 return Ok((Box::new(ReconciliationEnd), Status::executing(false)));
50 }
51
52 let all_table_names = ctx
53 .persistent_ctx
54 .logical_table_ids
55 .iter()
56 .cloned()
57 .zip(
58 ctx.persistent_ctx
59 .logical_tables
60 .iter()
61 .map(|t| t.table_ref()),
62 )
63 .collect::<HashMap<_, _>>();
64 let table_ids = ctx
65 .persistent_ctx
66 .update_table_infos
67 .iter()
68 .map(|(table_id, _)| *table_id)
69 .collect::<Vec<_>>();
70 let table_names = table_ids
71 .iter()
72 .map(|table_id| *all_table_names.get(table_id).unwrap())
73 .collect::<Vec<_>>();
74 let table_info_values = get_all_table_info_values_by_table_ids(
75 ctx.table_metadata_manager.table_info_manager(),
76 &table_ids,
77 &table_names,
78 )
79 .await?;
80
81 let mut table_info_values_to_update =
82 Vec::with_capacity(ctx.persistent_ctx.update_table_infos.len());
83 for ((table_id, column_metadatas), table_info_value) in ctx
84 .persistent_ctx
85 .update_table_infos
86 .iter()
87 .zip(table_info_values.into_iter())
88 {
89 let new_table_info = Self::build_new_table_info(
90 *table_id,
91 column_metadatas,
92 &table_info_value.table_info,
93 )?;
94 table_info_values_to_update.push((table_info_value, new_table_info));
95 }
96 let table_id = ctx.table_id();
97 let table_name = ctx.table_name();
98
99 let updated_table_info_num = table_info_values_to_update.len();
100 batch_update_table_info_values(&ctx.table_metadata_manager, table_info_values_to_update)
101 .await?;
102
103 info!(
104 "Updated table infos for logical tables: {:?}, physical table: {}, table_id: {}",
105 ctx.persistent_ctx
106 .update_table_infos
107 .iter()
108 .map(|(table_id, _)| table_id)
109 .collect::<Vec<_>>(),
110 table_id,
111 table_name,
112 );
113
114 let cache_ctx = CacheContext {
115 subject: Some(format!(
116 "Invalidate table by reconcile logical tables, physical_table_id: {}",
117 table_id
118 )),
119 };
120 let idents = Self::build_cache_ident_keys(table_id, table_name, &table_ids, &table_names);
121 ctx.cache_invalidator
122 .invalidate(&cache_ctx, &idents)
123 .await?;
124
125 ctx.persistent_ctx.update_table_infos.clear();
126 let metrics = ctx.mut_metrics();
128 metrics.update_table_info_count = updated_table_info_num;
129 Ok((Box::new(ReconciliationEnd), Status::executing(false)))
130 }
131
132 fn as_any(&self) -> &dyn Any {
133 self
134 }
135}
136
137impl UpdateTableInfos {
138 fn build_new_table_info(
139 table_id: TableId,
140 column_metadatas: &[ColumnMetadata],
141 table_info: &RawTableInfo,
142 ) -> Result<RawTableInfo> {
143 let table_ref = table_info.table_ref();
144 let table_meta = build_table_meta_from_column_metadatas(
145 table_id,
146 table_ref,
147 &table_info.meta,
148 None,
149 column_metadatas,
150 )?;
151
152 let mut new_table_info = table_info.clone();
153 new_table_info.meta = table_meta;
154 new_table_info.ident.version = table_info.ident.version + 1;
155 new_table_info.sort_columns();
156
157 Ok(new_table_info)
158 }
159
160 fn build_cache_ident_keys(
161 physical_table_id: TableId,
162 physical_table_name: &TableName,
163 table_ids: &[TableId],
164 table_names: &[TableReference],
165 ) -> Vec<CacheIdent> {
166 let mut cache_keys = Vec::with_capacity(table_ids.len() * 2 + 2);
167 cache_keys.push(CacheIdent::TableId(physical_table_id));
168 cache_keys.push(CacheIdent::TableName(physical_table_name.clone()));
169 cache_keys.extend(
170 table_ids
171 .iter()
172 .map(|table_id| CacheIdent::TableId(*table_id)),
173 );
174 cache_keys.extend(
175 table_names
176 .iter()
177 .map(|table_ref| CacheIdent::TableName((*table_ref).into())),
178 );
179
180 cache_keys
181 }
182}