common_meta/ddl/
alter_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod executor;
16mod metadata;
17mod region_request;
18
19use std::vec;
20
21use api::region::RegionResponse;
22use api::v1::alter_table_expr::Kind;
23use api::v1::RenameTable;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
27use common_procedure::{
28    Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
29    PoisonKeys, Procedure, ProcedureId, Status, StringKey,
30};
31use common_telemetry::{error, info, warn};
32use serde::{Deserialize, Serialize};
33use snafu::{ensure, ResultExt};
34use store_api::metadata::ColumnMetadata;
35use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
36use strum::AsRefStr;
37use table::metadata::{RawTableInfo, TableId, TableInfo};
38use table::table_reference::TableReference;
39
40use crate::ddl::alter_table::executor::AlterTableExecutor;
41use crate::ddl::utils::{
42    extract_column_metadatas, handle_multiple_results, map_to_procedure_error,
43    sync_follower_regions, MultipleResults,
44};
45use crate::ddl::DdlContext;
46use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu};
47use crate::key::table_info::TableInfoValue;
48use crate::key::{DeserializedValueWithBytes, RegionDistribution};
49use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
50use crate::metrics;
51use crate::poison_key::table_poison_key;
52use crate::rpc::ddl::AlterTableTask;
53use crate::rpc::router::{find_leaders, region_distribution, RegionRoute};
54
55/// The alter table procedure
56pub struct AlterTableProcedure {
57    /// The runtime context.
58    context: DdlContext,
59    /// The serialized data.
60    data: AlterTableData,
61    /// Cached new table metadata in the prepare step.
62    /// If we recover the procedure from json, then the table info value is not cached.
63    /// But we already validated it in the prepare step.
64    new_table_info: Option<TableInfo>,
65    /// The alter table executor.
66    executor: AlterTableExecutor,
67}
68
69/// Builds the executor from the [`AlterTableData`].
70///
71/// # Panics
72/// - If the alter kind is not set.
73fn build_executor_from_alter_expr(alter_data: &AlterTableData) -> AlterTableExecutor {
74    let table_name = alter_data.table_ref().into();
75    let table_id = alter_data.table_id;
76    let alter_kind = alter_data.task.alter_table.kind.as_ref().unwrap();
77    let new_table_name = if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
78        Some(new_table_name.to_string())
79    } else {
80        None
81    };
82    AlterTableExecutor::new(table_name, table_id, new_table_name)
83}
84
85impl AlterTableProcedure {
86    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
87
88    pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result<Self> {
89        task.validate()?;
90        let data = AlterTableData::new(task, table_id);
91        let executor = build_executor_from_alter_expr(&data);
92        Ok(Self {
93            context,
94            data,
95            new_table_info: None,
96            executor,
97        })
98    }
99
100    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
101        let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
102        let executor = build_executor_from_alter_expr(&data);
103
104        Ok(AlterTableProcedure {
105            context,
106            data,
107            new_table_info: None,
108            executor,
109        })
110    }
111
112    // Checks whether the table exists.
113    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
114        self.executor
115            .on_prepare(&self.context.table_metadata_manager)
116            .await?;
117        self.fill_table_info().await?;
118
119        // Safety: filled in `fill_table_info`.
120        let table_info_value = self.data.table_info_value.as_ref().unwrap();
121        let new_table_info = AlterTableExecutor::validate_alter_table_expr(
122            &table_info_value.table_info,
123            self.data.task.alter_table.clone(),
124        )?;
125        self.new_table_info = Some(new_table_info);
126
127        // Safety: Checked in `AlterTableProcedure::new`.
128        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
129        if matches!(alter_kind, Kind::RenameTable { .. }) {
130            self.data.state = AlterTableState::UpdateMetadata;
131        } else {
132            self.data.state = AlterTableState::SubmitAlterRegionRequests;
133        };
134        Ok(Status::executing(true))
135    }
136
137    fn table_poison_key(&self) -> PoisonKey {
138        table_poison_key(self.data.table_id())
139    }
140
141    async fn put_poison(
142        &self,
143        ctx_provider: &dyn ContextProvider,
144        procedure_id: ProcedureId,
145    ) -> Result<()> {
146        let poison_key = self.table_poison_key();
147        ctx_provider
148            .try_put_poison(&poison_key, procedure_id)
149            .await
150            .context(PutPoisonSnafu)
151    }
152
153    pub async fn submit_alter_region_requests(
154        &mut self,
155        procedure_id: ProcedureId,
156        ctx_provider: &dyn ContextProvider,
157    ) -> Result<Status> {
158        let table_id = self.data.table_id();
159        let (_, physical_table_route) = self
160            .context
161            .table_metadata_manager
162            .table_route_manager()
163            .get_physical_table_route(table_id)
164            .await?;
165
166        self.data.region_distribution =
167            Some(region_distribution(&physical_table_route.region_routes));
168        let leaders = find_leaders(&physical_table_route.region_routes);
169        let alter_kind = self.make_region_alter_kind()?;
170
171        info!(
172            "Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
173            self.data.table_ref(),
174            table_id,
175            alter_kind,
176        );
177
178        ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
179        // Puts the poison before submitting alter region requests to datanodes.
180        self.put_poison(ctx_provider, procedure_id).await?;
181        let results = self
182            .executor
183            .on_alter_regions(
184                &self.context.node_manager,
185                &physical_table_route.region_routes,
186                alter_kind,
187            )
188            .await;
189
190        match handle_multiple_results(results) {
191            MultipleResults::PartialRetryable(error) => {
192                // Just returns the error, and wait for the next try.
193                Err(error)
194            }
195            MultipleResults::PartialNonRetryable(error) => {
196                error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
197                // No retry will be done.
198                Ok(Status::poisoned(
199                    Some(self.table_poison_key()),
200                    ProcedureError::external(error),
201                ))
202            }
203            MultipleResults::AllRetryable(error) => {
204                // Just returns the error, and wait for the next try.
205                let err = BoxedError::new(error);
206                Err(err).context(RetryLaterSnafu {
207                    clean_poisons: true,
208                })
209            }
210            MultipleResults::Ok(results) => {
211                self.submit_sync_region_requests(&results, &physical_table_route.region_routes)
212                    .await;
213                self.handle_alter_region_response(results)?;
214                Ok(Status::executing_with_clean_poisons(true))
215            }
216            MultipleResults::AllNonRetryable(error) => {
217                error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
218                // It assumes the metadata on datanode is not changed.
219                // Case: The alter region request is sent but not applied. (e.g., InvalidArgument)
220
221                let err = BoxedError::new(error);
222                Err(err).context(AbortProcedureSnafu {
223                    clean_poisons: true,
224                })
225            }
226        }
227    }
228
229    fn handle_alter_region_response(&mut self, mut results: Vec<RegionResponse>) -> Result<()> {
230        if let Some(column_metadatas) =
231            extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
232        {
233            self.data.column_metadatas = column_metadatas;
234        } else {
235            warn!("altering table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged");
236        }
237        self.data.state = AlterTableState::UpdateMetadata;
238        Ok(())
239    }
240
241    async fn submit_sync_region_requests(
242        &mut self,
243        results: &[RegionResponse],
244        region_routes: &[RegionRoute],
245    ) {
246        // Safety: filled in `prepare` step.
247        let table_info = self.data.table_info().unwrap();
248        if let Err(err) = sync_follower_regions(
249            &self.context,
250            self.data.table_id(),
251            results,
252            region_routes,
253            table_info.meta.engine.as_str(),
254        )
255        .await
256        {
257            error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
258        }
259    }
260
261    /// Update table metadata.
262    pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
263        let table_id = self.data.table_id();
264        let table_ref = self.data.table_ref();
265        // Safety: filled in `fill_table_info`.
266        let table_info_value = self.data.table_info_value.as_ref().unwrap();
267        // Safety: Checked in `AlterTableProcedure::new`.
268        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
269
270        // Gets the table info from the cache or builds it.
271        let  new_info = match &self.new_table_info {
272            Some(cached) => cached.clone(),
273            None => AlterTableExecutor::validate_alter_table_expr(
274                &table_info_value.table_info,
275                self.data.task.alter_table.clone(),
276               )
277                .inspect_err(|e| {
278                    // We already check the table info in the prepare step so this should not happen.
279                    error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
280                })?,
281        };
282
283        // Safety: region distribution is set in `submit_alter_region_requests`.
284        self.executor
285            .on_alter_metadata(
286                &self.context.table_metadata_manager,
287                table_info_value,
288                self.data.region_distribution.as_ref(),
289                new_info.into(),
290                &self.data.column_metadatas,
291            )
292            .await?;
293
294        info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
295        self.data.state = AlterTableState::InvalidateTableCache;
296        Ok(Status::executing(true))
297    }
298
299    /// Broadcasts the invalidating table cache instructions.
300    async fn on_broadcast(&mut self) -> Result<Status> {
301        self.executor
302            .invalidate_table_cache(&self.context.cache_invalidator)
303            .await?;
304        Ok(Status::done())
305    }
306
307    fn lock_key_inner(&self) -> Vec<StringKey> {
308        let mut lock_key = vec![];
309        let table_ref = self.data.table_ref();
310        let table_id = self.data.table_id();
311        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
312        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
313        lock_key.push(TableLock::Write(table_id).into());
314
315        // Safety: Checked in `AlterTableProcedure::new`.
316        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
317        if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
318            lock_key.push(
319                TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
320            )
321        }
322
323        lock_key
324    }
325
326    #[cfg(test)]
327    pub(crate) fn data(&self) -> &AlterTableData {
328        &self.data
329    }
330
331    #[cfg(test)]
332    pub(crate) fn mut_data(&mut self) -> &mut AlterTableData {
333        &mut self.data
334    }
335}
336
337#[async_trait]
338impl Procedure for AlterTableProcedure {
339    fn type_name(&self) -> &str {
340        Self::TYPE_NAME
341    }
342
343    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
344        let state = &self.data.state;
345
346        let step = state.as_ref();
347
348        let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
349            .with_label_values(&[step])
350            .start_timer();
351
352        match state {
353            AlterTableState::Prepare => self.on_prepare().await,
354            AlterTableState::SubmitAlterRegionRequests => {
355                self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
356                    .await
357            }
358            AlterTableState::UpdateMetadata => self.on_update_metadata().await,
359            AlterTableState::InvalidateTableCache => self.on_broadcast().await,
360        }
361        .map_err(map_to_procedure_error)
362    }
363
364    fn dump(&self) -> ProcedureResult<String> {
365        serde_json::to_string(&self.data).context(ToJsonSnafu)
366    }
367
368    fn lock_key(&self) -> LockKey {
369        let key = self.lock_key_inner();
370
371        LockKey::new(key)
372    }
373
374    fn poison_keys(&self) -> PoisonKeys {
375        PoisonKeys::new(vec![self.table_poison_key()])
376    }
377}
378
379#[derive(Debug, Serialize, Deserialize, AsRefStr)]
380enum AlterTableState {
381    /// Prepares to alter the table.
382    Prepare,
383    /// Sends alter region requests to Datanode.
384    SubmitAlterRegionRequests,
385    /// Updates table metadata.
386    UpdateMetadata,
387    /// Broadcasts the invalidating table cache instruction.
388    InvalidateTableCache,
389}
390
391// The serialized data of alter table.
392#[derive(Debug, Serialize, Deserialize)]
393pub struct AlterTableData {
394    state: AlterTableState,
395    task: AlterTableTask,
396    table_id: TableId,
397    #[serde(default)]
398    column_metadatas: Vec<ColumnMetadata>,
399    /// Table info value before alteration.
400    table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
401    /// Region distribution for table in case we need to update region options.
402    region_distribution: Option<RegionDistribution>,
403}
404
405impl AlterTableData {
406    pub fn new(task: AlterTableTask, table_id: TableId) -> Self {
407        Self {
408            state: AlterTableState::Prepare,
409            task,
410            table_id,
411            column_metadatas: vec![],
412            table_info_value: None,
413            region_distribution: None,
414        }
415    }
416
417    fn table_ref(&self) -> TableReference {
418        self.task.table_ref()
419    }
420
421    fn table_id(&self) -> TableId {
422        self.table_id
423    }
424
425    fn table_info(&self) -> Option<&RawTableInfo> {
426        self.table_info_value
427            .as_ref()
428            .map(|value| &value.table_info)
429    }
430
431    #[cfg(test)]
432    pub(crate) fn column_metadatas(&self) -> &[ColumnMetadata] {
433        &self.column_metadatas
434    }
435
436    #[cfg(test)]
437    pub(crate) fn set_column_metadatas(&mut self, column_metadatas: Vec<ColumnMetadata>) {
438        self.column_metadatas = column_metadatas;
439    }
440}