Skip to main content

common_meta/ddl/
alter_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod executor;
16mod metadata;
17mod region_request;
18
19use std::vec;
20
21use api::region::RegionResponse;
22use api::v1::alter_table_expr::Kind;
23use api::v1::{RenameTable, SetTableOptions, UnsetTableOptions};
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
27use common_procedure::{
28    Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
29    PoisonKeys, Procedure, ProcedureId, Status, StringKey,
30};
31use common_telemetry::{error, info, warn};
32use serde::{Deserialize, Serialize};
33use snafu::{ResultExt, ensure};
34use store_api::metadata::ColumnMetadata;
35use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
36use strum::AsRefStr;
37use table::metadata::{TableId, TableInfo};
38use table::requests::REPARTITION_COLUMN_HINT_KEY;
39use table::table_reference::TableReference;
40
41use crate::ddl::DdlContext;
42use crate::ddl::alter_table::executor::AlterTableExecutor;
43use crate::ddl::utils::{
44    MultipleResults, extract_column_metadatas, handle_multiple_results, map_to_procedure_error,
45    sync_follower_regions,
46};
47use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu};
48use crate::key::table_info::TableInfoValue;
49use crate::key::{DeserializedValueWithBytes, RegionDistribution};
50use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
51use crate::metrics;
52use crate::poison_key::table_poison_key;
53use crate::rpc::ddl::AlterTableTask;
54use crate::rpc::router::{RegionRoute, find_leaders, region_distribution};
55
56/// The alter table procedure
57pub struct AlterTableProcedure {
58    /// The runtime context.
59    context: DdlContext,
60    /// The serialized data.
61    data: AlterTableData,
62    /// Cached new table metadata in the prepare step.
63    /// If we recover the procedure from json, then the table info value is not cached.
64    /// But we already validated it in the prepare step.
65    new_table_info: Option<TableInfo>,
66    /// The alter table executor.
67    executor: AlterTableExecutor,
68}
69
70/// Builds the executor from the [`AlterTableData`].
71///
72/// # Panics
73/// - If the alter kind is not set.
74fn build_executor_from_alter_expr(alter_data: &AlterTableData) -> AlterTableExecutor {
75    let table_name = alter_data.table_ref().into();
76    let table_id = alter_data.table_id;
77    let alter_kind = alter_data.task.alter_table.kind.as_ref().unwrap();
78    let new_table_name = if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
79        Some(new_table_name.clone())
80    } else {
81        None
82    };
83    AlterTableExecutor::new(table_name, table_id, new_table_name)
84}
85
86impl AlterTableProcedure {
87    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
88
89    pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result<Self> {
90        task.validate()?;
91        let data = AlterTableData::new(task, table_id);
92        let executor = build_executor_from_alter_expr(&data);
93        Ok(Self {
94            context,
95            data,
96            new_table_info: None,
97            executor,
98        })
99    }
100
101    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
102        let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
103        let executor = build_executor_from_alter_expr(&data);
104
105        Ok(AlterTableProcedure {
106            context,
107            data,
108            new_table_info: None,
109            executor,
110        })
111    }
112
113    // Checks whether the table exists.
114    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
115        self.executor
116            .on_prepare(&self.context.table_metadata_manager)
117            .await?;
118        self.fill_table_info().await?;
119
120        // Safety: filled in `fill_table_info`.
121        let table_info_value = self.data.table_info_value.as_ref().unwrap();
122        let new_table_info = AlterTableExecutor::validate_alter_table_expr(
123            &table_info_value.table_info,
124            self.data.task.alter_table.clone(),
125        )?;
126        self.new_table_info = Some(new_table_info);
127
128        // Safety: Checked in `AlterTableProcedure::new`.
129        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
130        if is_metadata_only_alter(alter_kind) {
131            self.data.state = AlterTableState::UpdateMetadata;
132        } else {
133            self.data.state = AlterTableState::SubmitAlterRegionRequests;
134        };
135        Ok(Status::executing(true))
136    }
137
138    fn table_poison_key(&self) -> PoisonKey {
139        table_poison_key(self.data.table_id())
140    }
141
142    async fn put_poison(
143        &self,
144        ctx_provider: &dyn ContextProvider,
145        procedure_id: ProcedureId,
146    ) -> Result<()> {
147        let poison_key = self.table_poison_key();
148        ctx_provider
149            .try_put_poison(&poison_key, procedure_id)
150            .await
151            .context(PutPoisonSnafu)
152    }
153
154    pub async fn submit_alter_region_requests(
155        &mut self,
156        procedure_id: ProcedureId,
157        ctx_provider: &dyn ContextProvider,
158    ) -> Result<Status> {
159        let table_id = self.data.table_id();
160        let (_, physical_table_route) = self
161            .context
162            .table_metadata_manager
163            .table_route_manager()
164            .get_physical_table_route(table_id)
165            .await?;
166
167        self.data.region_distribution =
168            Some(region_distribution(&physical_table_route.region_routes));
169        let leaders = find_leaders(&physical_table_route.region_routes);
170        let alter_kind = self.make_region_alter_kind()?;
171
172        info!(
173            "Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
174            self.data.table_ref(),
175            table_id,
176            alter_kind,
177        );
178
179        ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
180        // Puts the poison before submitting alter region requests to datanodes.
181        self.put_poison(ctx_provider, procedure_id).await?;
182        let results = self
183            .executor
184            .on_alter_regions(
185                &self.context.node_manager,
186                &physical_table_route.region_routes,
187                alter_kind,
188            )
189            .await;
190
191        match handle_multiple_results(results) {
192            MultipleResults::PartialRetryable(error) => {
193                // Just returns the error, and wait for the next try.
194                Err(error)
195            }
196            MultipleResults::PartialNonRetryable(error) => {
197                error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
198                // No retry will be done.
199                Ok(Status::poisoned(
200                    Some(self.table_poison_key()),
201                    ProcedureError::external(error),
202                ))
203            }
204            MultipleResults::AllRetryable(error) => {
205                // Just returns the error, and wait for the next try.
206                let err = BoxedError::new(error);
207                Err(err).context(RetryLaterSnafu {
208                    clean_poisons: true,
209                })
210            }
211            MultipleResults::Ok(results) => {
212                self.submit_sync_region_requests(&results, &physical_table_route.region_routes)
213                    .await;
214                self.handle_alter_region_response(results)?;
215                Ok(Status::executing_with_clean_poisons(true))
216            }
217            MultipleResults::AllNonRetryable(error) => {
218                error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
219                // It assumes the metadata on datanode is not changed.
220                // Case: The alter region request is sent but not applied. (e.g., InvalidArgument)
221
222                let err = BoxedError::new(error);
223                Err(err).context(AbortProcedureSnafu {
224                    clean_poisons: true,
225                })
226            }
227        }
228    }
229
230    fn handle_alter_region_response(&mut self, mut results: Vec<RegionResponse>) -> Result<()> {
231        if let Some(column_metadatas) =
232            extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
233        {
234            self.data.column_metadatas = column_metadatas;
235        } else {
236            warn!(
237                "altering table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
238            );
239        }
240        self.data.state = AlterTableState::UpdateMetadata;
241        Ok(())
242    }
243
244    async fn submit_sync_region_requests(
245        &mut self,
246        results: &[RegionResponse],
247        region_routes: &[RegionRoute],
248    ) {
249        // Safety: filled in `prepare` step.
250        let table_info = self.data.table_info().unwrap();
251        if let Err(err) = sync_follower_regions(
252            &self.context,
253            self.data.table_id(),
254            results,
255            region_routes,
256            table_info.meta.engine.as_str(),
257        )
258        .await
259        {
260            error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
261        }
262    }
263
264    /// Update table metadata.
265    pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
266        let table_id = self.data.table_id();
267        let table_ref = self.data.table_ref();
268        // Safety: filled in `fill_table_info`.
269        let table_info_value = self.data.table_info_value.as_ref().unwrap();
270        // Safety: Checked in `AlterTableProcedure::new`.
271        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
272        let metadata_only_alter = is_metadata_only_alter(alter_kind);
273
274        // Gets the table info from the cache or builds it.
275        let  new_info = match &self.new_table_info {
276            Some(cached) => cached.clone(),
277            None => AlterTableExecutor::validate_alter_table_expr(
278                &table_info_value.table_info,
279                self.data.task.alter_table.clone(),
280               )
281                .inspect_err(|e| {
282                    // We already check the table info in the prepare step so this should not happen.
283                    error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
284                })?,
285        };
286
287        // Safety: region distribution is set in `submit_alter_region_requests`.
288        self.executor
289            .on_alter_metadata(
290                &self.context.table_metadata_manager,
291                table_info_value,
292                self.data.region_distribution.as_ref(),
293                new_info,
294                &self.data.column_metadatas,
295                metadata_only_alter,
296            )
297            .await?;
298
299        info!(
300            "Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}"
301        );
302        self.data.state = AlterTableState::InvalidateTableCache;
303        Ok(Status::executing(true))
304    }
305
306    /// Broadcasts the invalidating table cache instructions.
307    async fn on_broadcast(&mut self) -> Result<Status> {
308        self.executor
309            .invalidate_table_cache(&self.context.cache_invalidator)
310            .await?;
311        Ok(Status::done())
312    }
313
314    fn lock_key_inner(&self) -> Vec<StringKey> {
315        let mut lock_key = vec![];
316        let table_ref = self.data.table_ref();
317        let table_id = self.data.table_id();
318        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
319        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
320        lock_key.push(TableLock::Write(table_id).into());
321
322        // Safety: Checked in `AlterTableProcedure::new`.
323        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
324        if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
325            lock_key.push(
326                TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
327            )
328        }
329
330        lock_key
331    }
332
333    #[cfg(test)]
334    pub(crate) fn data(&self) -> &AlterTableData {
335        &self.data
336    }
337
338    #[cfg(test)]
339    pub(crate) fn mut_data(&mut self) -> &mut AlterTableData {
340        &mut self.data
341    }
342}
343
344fn is_metadata_only_alter(alter_kind: &Kind) -> bool {
345    match alter_kind {
346        Kind::RenameTable { .. } => true,
347        Kind::SetTableOptions(SetTableOptions { table_options }) => {
348            table_options.len() == 1 && table_options[0].key.as_str() == REPARTITION_COLUMN_HINT_KEY
349        }
350        Kind::UnsetTableOptions(UnsetTableOptions { keys }) => {
351            keys.len() == 1 && keys[0].as_str() == REPARTITION_COLUMN_HINT_KEY
352        }
353        _ => false,
354    }
355}
356
357#[async_trait]
358impl Procedure for AlterTableProcedure {
359    fn type_name(&self) -> &str {
360        Self::TYPE_NAME
361    }
362
363    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
364        let state = &self.data.state;
365
366        let step = state.as_ref();
367
368        let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
369            .with_label_values(&[step])
370            .start_timer();
371
372        match state {
373            AlterTableState::Prepare => self.on_prepare().await,
374            AlterTableState::SubmitAlterRegionRequests => {
375                self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
376                    .await
377            }
378            AlterTableState::UpdateMetadata => self.on_update_metadata().await,
379            AlterTableState::InvalidateTableCache => self.on_broadcast().await,
380        }
381        .map_err(map_to_procedure_error)
382    }
383
384    fn dump(&self) -> ProcedureResult<String> {
385        serde_json::to_string(&self.data).context(ToJsonSnafu)
386    }
387
388    fn lock_key(&self) -> LockKey {
389        let key = self.lock_key_inner();
390
391        LockKey::new(key)
392    }
393
394    fn poison_keys(&self) -> PoisonKeys {
395        PoisonKeys::new(vec![self.table_poison_key()])
396    }
397}
398
399#[derive(Debug, Serialize, Deserialize, AsRefStr)]
400enum AlterTableState {
401    /// Prepares to alter the table.
402    Prepare,
403    /// Sends alter region requests to Datanode.
404    SubmitAlterRegionRequests,
405    /// Updates table metadata.
406    UpdateMetadata,
407    /// Broadcasts the invalidating table cache instruction.
408    InvalidateTableCache,
409}
410
411// The serialized data of alter table.
412#[derive(Debug, Serialize, Deserialize)]
413pub struct AlterTableData {
414    state: AlterTableState,
415    task: AlterTableTask,
416    table_id: TableId,
417    #[serde(default)]
418    column_metadatas: Vec<ColumnMetadata>,
419    /// Table info value before alteration.
420    table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
421    /// Region distribution for table in case we need to update region options.
422    region_distribution: Option<RegionDistribution>,
423}
424
425impl AlterTableData {
426    pub fn new(task: AlterTableTask, table_id: TableId) -> Self {
427        Self {
428            state: AlterTableState::Prepare,
429            task,
430            table_id,
431            column_metadatas: vec![],
432            table_info_value: None,
433            region_distribution: None,
434        }
435    }
436
437    fn table_ref(&self) -> TableReference<'_> {
438        self.task.table_ref()
439    }
440
441    fn table_id(&self) -> TableId {
442        self.table_id
443    }
444
445    fn table_info(&self) -> Option<&TableInfo> {
446        self.table_info_value
447            .as_ref()
448            .map(|value| &value.table_info)
449    }
450
451    #[cfg(test)]
452    pub(crate) fn column_metadatas(&self) -> &[ColumnMetadata] {
453        &self.column_metadatas
454    }
455
456    #[cfg(test)]
457    pub(crate) fn set_column_metadatas(&mut self, column_metadatas: Vec<ColumnMetadata>) {
458        self.column_metadatas = column_metadatas;
459    }
460}