common_meta/reconciliation/reconcile_logical_tables/
update_table_infos.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17
18use common_procedure::{Context as ProcedureContext, Status};
19use common_telemetry::info;
20use serde::{Deserialize, Serialize};
21use store_api::metadata::ColumnMetadata;
22use store_api::storage::TableId;
23use table::metadata::RawTableInfo;
24use table::table_name::TableName;
25use table::table_reference::TableReference;
26
27use crate::cache_invalidator::Context as CacheContext;
28use crate::ddl::utils::table_info::{
29    batch_update_table_info_values, get_all_table_info_values_by_table_ids,
30};
31use crate::error::Result;
32use crate::instruction::CacheIdent;
33use crate::reconciliation::reconcile_logical_tables::reconciliation_end::ReconciliationEnd;
34use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
35use crate::reconciliation::utils::build_table_meta_from_column_metadatas;
36
37#[derive(Debug, Serialize, Deserialize)]
38pub struct UpdateTableInfos;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for UpdateTableInfos {
43    async fn next(
44        &mut self,
45        ctx: &mut ReconcileLogicalTablesContext,
46        _procedure_ctx: &ProcedureContext,
47    ) -> Result<(Box<dyn State>, Status)> {
48        if ctx.persistent_ctx.update_table_infos.is_empty() {
49            return Ok((Box::new(ReconciliationEnd), Status::executing(false)));
50        }
51
52        let all_table_names = ctx
53            .persistent_ctx
54            .logical_table_ids
55            .iter()
56            .cloned()
57            .zip(
58                ctx.persistent_ctx
59                    .logical_tables
60                    .iter()
61                    .map(|t| t.table_ref()),
62            )
63            .collect::<HashMap<_, _>>();
64        let table_ids = ctx
65            .persistent_ctx
66            .update_table_infos
67            .iter()
68            .map(|(table_id, _)| *table_id)
69            .collect::<Vec<_>>();
70        let table_names = table_ids
71            .iter()
72            .map(|table_id| *all_table_names.get(table_id).unwrap())
73            .collect::<Vec<_>>();
74        let table_info_values = get_all_table_info_values_by_table_ids(
75            ctx.table_metadata_manager.table_info_manager(),
76            &table_ids,
77            &table_names,
78        )
79        .await?;
80
81        let mut table_info_values_to_update =
82            Vec::with_capacity(ctx.persistent_ctx.update_table_infos.len());
83        for ((table_id, column_metadatas), table_info_value) in ctx
84            .persistent_ctx
85            .update_table_infos
86            .iter()
87            .zip(table_info_values.into_iter())
88        {
89            let new_table_info = Self::build_new_table_info(
90                *table_id,
91                column_metadatas,
92                &table_info_value.table_info,
93            )?;
94            table_info_values_to_update.push((table_info_value, new_table_info));
95        }
96        let table_id = ctx.table_id();
97        let table_name = ctx.table_name();
98
99        let updated_table_info_num = table_info_values_to_update.len();
100        batch_update_table_info_values(&ctx.table_metadata_manager, table_info_values_to_update)
101            .await?;
102
103        info!(
104            "Updated table infos for logical tables: {:?}, physical table: {}, table_id: {}",
105            ctx.persistent_ctx
106                .update_table_infos
107                .iter()
108                .map(|(table_id, _)| table_id)
109                .collect::<Vec<_>>(),
110            table_id,
111            table_name,
112        );
113
114        let cache_ctx = CacheContext {
115            subject: Some(format!(
116                "Invalidate table by reconcile logical tables, physical_table_id: {}",
117                table_id
118            )),
119        };
120        let idents = Self::build_cache_ident_keys(table_id, table_name, &table_ids, &table_names);
121        ctx.cache_invalidator
122            .invalidate(&cache_ctx, &idents)
123            .await?;
124
125        ctx.persistent_ctx.update_table_infos.clear();
126        // Update metrics.
127        let metrics = ctx.mut_metrics();
128        metrics.update_table_info_count = updated_table_info_num;
129        Ok((Box::new(ReconciliationEnd), Status::executing(false)))
130    }
131
132    fn as_any(&self) -> &dyn Any {
133        self
134    }
135}
136
137impl UpdateTableInfos {
138    fn build_new_table_info(
139        table_id: TableId,
140        column_metadatas: &[ColumnMetadata],
141        table_info: &RawTableInfo,
142    ) -> Result<RawTableInfo> {
143        let table_ref = table_info.table_ref();
144        let table_meta = build_table_meta_from_column_metadatas(
145            table_id,
146            table_ref,
147            &table_info.meta,
148            None,
149            column_metadatas,
150        )?;
151
152        let mut new_table_info = table_info.clone();
153        new_table_info.meta = table_meta;
154        new_table_info.ident.version = table_info.ident.version + 1;
155        new_table_info.sort_columns();
156
157        Ok(new_table_info)
158    }
159
160    fn build_cache_ident_keys(
161        physical_table_id: TableId,
162        physical_table_name: &TableName,
163        table_ids: &[TableId],
164        table_names: &[TableReference],
165    ) -> Vec<CacheIdent> {
166        let mut cache_keys = Vec::with_capacity(table_ids.len() * 2 + 2);
167        cache_keys.push(CacheIdent::TableId(physical_table_id));
168        cache_keys.push(CacheIdent::TableName(physical_table_name.clone()));
169        cache_keys.extend(
170            table_ids
171                .iter()
172                .map(|table_id| CacheIdent::TableId(*table_id)),
173        );
174        cache_keys.extend(
175            table_names
176                .iter()
177                .map(|table_ref| CacheIdent::TableName((*table_ref).into())),
178        );
179
180        cache_keys
181    }
182}