common_meta/reconciliation/reconcile_table/
update_table_info.rs1use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use store_api::metadata::ColumnMetadata;
21use tonic::async_trait;
22
23use crate::cache_invalidator::Context as CacheContext;
24use crate::error::Result;
25use crate::instruction::CacheIdent;
26use crate::key::table_info::TableInfoValue;
27use crate::key::DeserializedValueWithBytes;
28use crate::reconciliation::reconcile_table::reconciliation_end::ReconciliationEnd;
29use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
30use crate::rpc::router::region_distribution;
31
32#[derive(Debug, Serialize, Deserialize)]
34pub struct UpdateTableInfo {
35 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
36 column_metadatas: Vec<ColumnMetadata>,
37}
38
39impl UpdateTableInfo {
40 pub fn new(
41 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
42 column_metadatas: Vec<ColumnMetadata>,
43 ) -> Self {
44 Self {
45 table_info_value,
46 column_metadatas,
47 }
48 }
49}
50
51#[async_trait]
52#[typetag::serde]
53impl State for UpdateTableInfo {
54 async fn next(
55 &mut self,
56 ctx: &mut ReconcileTableContext,
57 _procedure_ctx: &ProcedureContext,
58 ) -> Result<(Box<dyn State>, Status)> {
59 let new_table_meta = match &ctx.volatile_ctx.table_meta {
60 Some(table_meta) => table_meta.clone(),
61 None => ctx.build_table_meta(&self.column_metadatas)?,
62 };
63
64 let region_routes = &ctx
65 .persistent_ctx
66 .physical_table_route
67 .as_ref()
68 .unwrap()
69 .region_routes;
70 let region_distribution = region_distribution(region_routes);
71 let current_table_info_value = ctx.persistent_ctx.table_info_value.as_ref().unwrap();
72 let new_table_info = {
73 let mut new_table_info = current_table_info_value.table_info.clone();
74 new_table_info.meta = new_table_meta;
75 new_table_info
76 };
77
78 if new_table_info.meta == current_table_info_value.table_info.meta {
79 info!(
80 "Table info is already up to date for table: {}, table_id: {}",
81 ctx.table_name(),
82 ctx.table_id()
83 );
84 return Ok((Box::new(ReconciliationEnd), Status::executing(true)));
85 }
86
87 info!(
88 "Updating table info for table: {}, table_id: {}. new table meta: {:?}, current table meta: {:?}",
89 ctx.table_name(),
90 ctx.table_id(),
91 new_table_info.meta,
92 current_table_info_value.table_info.meta,
93 );
94 ctx.table_metadata_manager
95 .update_table_info(
96 current_table_info_value,
97 Some(region_distribution),
98 new_table_info,
99 )
100 .await?;
101
102 let table_ref = ctx.table_name().table_ref();
103 let table_id = ctx.table_id();
104 let cache_ctx = CacheContext {
105 subject: Some(format!(
106 "Invalidate table cache by reconciling table {}, table_id: {}",
107 table_ref, table_id,
108 )),
109 };
110 ctx.cache_invalidator
111 .invalidate(
112 &cache_ctx,
113 &[
114 CacheIdent::TableName(table_ref.into()),
115 CacheIdent::TableId(table_id),
116 ],
117 )
118 .await?;
119 let metrics = ctx.mut_metrics();
121 metrics.update_table_info = true;
122
123 Ok((Box::new(ReconciliationEnd), Status::executing(true)))
124 }
125
126 fn as_any(&self) -> &dyn Any {
127 self
128 }
129}