common_meta/reconciliation/reconcile_table/
update_table_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use common_telemetry::info;
19use serde::{Deserialize, Serialize};
20use store_api::metadata::ColumnMetadata;
21use tonic::async_trait;
22
23use crate::cache_invalidator::Context as CacheContext;
24use crate::error::Result;
25use crate::instruction::CacheIdent;
26use crate::key::table_info::TableInfoValue;
27use crate::key::DeserializedValueWithBytes;
28use crate::reconciliation::reconcile_table::reconciliation_end::ReconciliationEnd;
29use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
30use crate::rpc::router::region_distribution;
31
32/// Updates the table info with the new column metadatas.
33#[derive(Debug, Serialize, Deserialize)]
34pub struct UpdateTableInfo {
35    table_info_value: DeserializedValueWithBytes<TableInfoValue>,
36    column_metadatas: Vec<ColumnMetadata>,
37}
38
39impl UpdateTableInfo {
40    pub fn new(
41        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
42        column_metadatas: Vec<ColumnMetadata>,
43    ) -> Self {
44        Self {
45            table_info_value,
46            column_metadatas,
47        }
48    }
49}
50
51#[async_trait]
52#[typetag::serde]
53impl State for UpdateTableInfo {
54    async fn next(
55        &mut self,
56        ctx: &mut ReconcileTableContext,
57        _procedure_ctx: &ProcedureContext,
58    ) -> Result<(Box<dyn State>, Status)> {
59        let new_table_meta = match &ctx.volatile_ctx.table_meta {
60            Some(table_meta) => table_meta.clone(),
61            None => ctx.build_table_meta(&self.column_metadatas)?,
62        };
63
64        let region_routes = &ctx
65            .persistent_ctx
66            .physical_table_route
67            .as_ref()
68            .unwrap()
69            .region_routes;
70        let region_distribution = region_distribution(region_routes);
71        let current_table_info_value = ctx.persistent_ctx.table_info_value.as_ref().unwrap();
72        let new_table_info = {
73            let mut new_table_info = current_table_info_value.table_info.clone();
74            new_table_info.meta = new_table_meta;
75            new_table_info
76        };
77
78        if new_table_info.meta == current_table_info_value.table_info.meta {
79            info!(
80                "Table info is already up to date for table: {}, table_id: {}",
81                ctx.table_name(),
82                ctx.table_id()
83            );
84            return Ok((Box::new(ReconciliationEnd), Status::executing(true)));
85        }
86
87        info!(
88            "Updating table info for table: {}, table_id: {}. new table meta: {:?}, current table meta: {:?}",
89            ctx.table_name(),
90            ctx.table_id(),
91            new_table_info.meta,
92            current_table_info_value.table_info.meta,
93        );
94        ctx.table_metadata_manager
95            .update_table_info(
96                current_table_info_value,
97                Some(region_distribution),
98                new_table_info,
99            )
100            .await?;
101
102        let table_ref = ctx.table_name().table_ref();
103        let table_id = ctx.table_id();
104        let cache_ctx = CacheContext {
105            subject: Some(format!(
106                "Invalidate table cache by reconciling table {}, table_id: {}",
107                table_ref, table_id,
108            )),
109        };
110        ctx.cache_invalidator
111            .invalidate(
112                &cache_ctx,
113                &[
114                    CacheIdent::TableName(table_ref.into()),
115                    CacheIdent::TableId(table_id),
116                ],
117            )
118            .await?;
119        // Update metrics.
120        let metrics = ctx.mut_metrics();
121        metrics.update_table_info = true;
122
123        Ok((Box::new(ReconciliationEnd), Status::executing(true)))
124    }
125
126    fn as_any(&self) -> &dyn Any {
127        self
128    }
129}