common_meta/ddl/
alter_logical_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod executor;
16mod update_metadata;
17mod validator;
18
19use api::region::RegionResponse;
20use async_trait::async_trait;
21use common_catalog::format_full_table_name;
22use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
23use common_procedure::{Context, LockKey, Procedure, Status};
24use common_telemetry::{debug, error, info, warn};
25pub use executor::make_alter_region_request;
26use serde::{Deserialize, Serialize};
27use snafu::ResultExt;
28use store_api::metadata::ColumnMetadata;
29use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
30use strum::AsRefStr;
31use table::metadata::TableId;
32
33use crate::cache_invalidator::Context as CacheContext;
34use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
35use crate::ddl::alter_logical_tables::validator::{
36    retain_unskipped, AlterLogicalTableValidator, ValidatorResult,
37};
38use crate::ddl::utils::{extract_column_metadatas, map_to_procedure_error, sync_follower_regions};
39use crate::ddl::DdlContext;
40use crate::error::Result;
41use crate::instruction::CacheIdent;
42use crate::key::table_info::TableInfoValue;
43use crate::key::table_route::PhysicalTableRouteValue;
44use crate::key::DeserializedValueWithBytes;
45use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
46use crate::metrics;
47use crate::rpc::ddl::AlterTableTask;
48use crate::rpc::router::RegionRoute;
49
50pub struct AlterLogicalTablesProcedure {
51    pub context: DdlContext,
52    pub data: AlterTablesData,
53}
54
55/// Builds the validator from the [`AlterTablesData`].
56fn build_validator_from_alter_table_data<'a>(
57    data: &'a AlterTablesData,
58) -> AlterLogicalTableValidator<'a> {
59    let phsycial_table_id = data.physical_table_id;
60    let alters = data
61        .tasks
62        .iter()
63        .map(|task| &task.alter_table)
64        .collect::<Vec<_>>();
65    AlterLogicalTableValidator::new(phsycial_table_id, alters)
66}
67
68/// Builds the executor from the [`AlterTablesData`].
69fn build_executor_from_alter_expr<'a>(data: &'a AlterTablesData) -> AlterLogicalTablesExecutor<'a> {
70    debug_assert_eq!(data.tasks.len(), data.table_info_values.len());
71    let alters = data
72        .tasks
73        .iter()
74        .zip(data.table_info_values.iter())
75        .map(|(task, table_info)| (table_info.table_info.ident.table_id, &task.alter_table))
76        .collect::<Vec<_>>();
77    AlterLogicalTablesExecutor::new(alters)
78}
79
80impl AlterLogicalTablesProcedure {
81    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";
82
83    pub fn new(
84        tasks: Vec<AlterTableTask>,
85        physical_table_id: TableId,
86        context: DdlContext,
87    ) -> Self {
88        Self {
89            context,
90            data: AlterTablesData {
91                state: AlterTablesState::Prepare,
92                tasks,
93                table_info_values: vec![],
94                physical_table_id,
95                physical_table_info: None,
96                physical_table_route: None,
97                physical_columns: vec![],
98                table_cache_keys_to_invalidate: vec![],
99            },
100        }
101    }
102
103    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
104        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
105        Ok(Self { context, data })
106    }
107
108    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
109        let validator = build_validator_from_alter_table_data(&self.data);
110        let ValidatorResult {
111            num_skipped,
112            skip_alter,
113            table_info_values,
114            physical_table_info,
115            physical_table_route,
116        } = validator
117            .validate(&self.context.table_metadata_manager)
118            .await?;
119
120        let num_tasks = self.data.tasks.len();
121        if num_skipped == num_tasks {
122            info!("All the alter tasks are finished, will skip the procedure.");
123            let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys(
124                &physical_table_info,
125                &table_info_values
126                    .iter()
127                    .map(|v| v.get_inner_ref())
128                    .collect::<Vec<_>>(),
129            );
130            self.data.table_cache_keys_to_invalidate = cache_ident_keys;
131            // Re-invalidate the table cache
132            self.data.state = AlterTablesState::InvalidateTableCache;
133            return Ok(Status::executing(true));
134        } else if num_skipped > 0 {
135            info!(
136                "There are {} alter tasks, {} of them were already finished.",
137                num_tasks, num_skipped
138            );
139        }
140
141        // Updates the procedure state.
142        retain_unskipped(&mut self.data.tasks, &skip_alter);
143        self.data.physical_table_info = Some(physical_table_info);
144        self.data.physical_table_route = Some(physical_table_route);
145        self.data.table_info_values = table_info_values;
146        debug_assert_eq!(self.data.tasks.len(), self.data.table_info_values.len());
147        self.data.state = AlterTablesState::SubmitAlterRegionRequests;
148        Ok(Status::executing(true))
149    }
150
151    pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
152        // Safety: we have checked the state in on_prepare
153        let physical_table_route = &self.data.physical_table_route.as_ref().unwrap();
154        let executor = build_executor_from_alter_expr(&self.data);
155        let mut results = executor
156            .on_alter_regions(
157                &self.context.node_manager,
158                &physical_table_route.region_routes,
159            )
160            .await?;
161
162        if let Some(column_metadatas) =
163            extract_column_metadatas(&mut results, ALTER_PHYSICAL_EXTENSION_KEY)?
164        {
165            self.data.physical_columns = column_metadatas;
166        } else {
167            warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
168        }
169        self.submit_sync_region_requests(results, &physical_table_route.region_routes)
170            .await;
171        self.data.state = AlterTablesState::UpdateMetadata;
172        Ok(Status::executing(true))
173    }
174
175    async fn submit_sync_region_requests(
176        &self,
177        results: Vec<RegionResponse>,
178        region_routes: &[RegionRoute],
179    ) {
180        let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
181        if let Err(err) = sync_follower_regions(
182            &self.context,
183            self.data.physical_table_id,
184            &results,
185            region_routes,
186            table_info.meta.engine.as_str(),
187        )
188        .await
189        {
190            error!(err; "Failed to sync regions for table {}, table_id: {}",
191                        format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name),
192                        self.data.physical_table_id
193            );
194        }
195    }
196
197    pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
198        self.update_physical_table_metadata().await?;
199        self.update_logical_tables_metadata().await?;
200
201        let logical_table_info_values = self
202            .data
203            .table_info_values
204            .iter()
205            .map(|v| v.get_inner_ref())
206            .collect::<Vec<_>>();
207
208        let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys(
209            self.data.physical_table_info.as_ref().unwrap(),
210            &logical_table_info_values,
211        );
212        self.data.table_cache_keys_to_invalidate = cache_ident_keys;
213        self.data.clear_metadata_fields();
214
215        self.data.state = AlterTablesState::InvalidateTableCache;
216        Ok(Status::executing(true))
217    }
218
219    pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
220        let to_invalidate = &self.data.table_cache_keys_to_invalidate;
221
222        let ctx = CacheContext {
223            subject: Some(format!(
224                "Invalidate table cache by altering logical tables, physical_table_id: {}",
225                self.data.physical_table_id,
226            )),
227        };
228
229        self.context
230            .cache_invalidator
231            .invalidate(&ctx, to_invalidate)
232            .await?;
233        Ok(Status::done())
234    }
235}
236
237#[async_trait]
238impl Procedure for AlterLogicalTablesProcedure {
239    fn type_name(&self) -> &str {
240        Self::TYPE_NAME
241    }
242
243    async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
244        let state = &self.data.state;
245
246        let step = state.as_ref();
247
248        let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
249            .with_label_values(&[step])
250            .start_timer();
251        debug!(
252            "Executing alter logical tables procedure, state: {:?}",
253            state
254        );
255
256        match state {
257            AlterTablesState::Prepare => self.on_prepare().await,
258            AlterTablesState::SubmitAlterRegionRequests => {
259                self.on_submit_alter_region_requests().await
260            }
261            AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
262            AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
263        }
264        .map_err(map_to_procedure_error)
265    }
266
267    fn dump(&self) -> ProcedureResult<String> {
268        serde_json::to_string(&self.data).context(ToJsonSnafu)
269    }
270
271    fn lock_key(&self) -> LockKey {
272        // CatalogLock, SchemaLock,
273        // TableLock
274        // TableNameLock(s)
275        let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
276        let table_ref = self.data.tasks[0].table_ref();
277        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
278        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
279        lock_key.push(TableLock::Write(self.data.physical_table_id).into());
280        lock_key.extend(
281            self.data
282                .table_info_values
283                .iter()
284                .map(|table| TableLock::Write(table.table_info.ident.table_id).into()),
285        );
286
287        LockKey::new(lock_key)
288    }
289}
290
291#[derive(Debug, Serialize, Deserialize)]
292pub struct AlterTablesData {
293    state: AlterTablesState,
294    tasks: Vec<AlterTableTask>,
295    /// Table info values before the alter operation.
296    /// Corresponding one-to-one with the AlterTableTask in tasks.
297    table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
298    /// Physical table info
299    physical_table_id: TableId,
300    physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
301    physical_table_route: Option<PhysicalTableRouteValue>,
302    physical_columns: Vec<ColumnMetadata>,
303    table_cache_keys_to_invalidate: Vec<CacheIdent>,
304}
305
306impl AlterTablesData {
307    /// Clears all data fields except `state` and `table_cache_keys_to_invalidate` after metadata update.
308    /// This is done to avoid persisting unnecessary data after the update metadata step.
309    fn clear_metadata_fields(&mut self) {
310        self.tasks.clear();
311        self.table_info_values.clear();
312        self.physical_table_id = 0;
313        self.physical_table_info = None;
314        self.physical_table_route = None;
315        self.physical_columns.clear();
316    }
317}
318
319#[derive(Debug, Serialize, Deserialize, AsRefStr)]
320enum AlterTablesState {
321    /// Prepares to alter the table
322    Prepare,
323    SubmitAlterRegionRequests,
324    /// Updates table metadata.
325    UpdateMetadata,
326    /// Broadcasts the invalidating table cache instruction.
327    InvalidateTableCache,
328}