common_meta/ddl/
alter_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod executor;
16mod metadata;
17mod region_request;
18
19use std::vec;
20
21use api::region::RegionResponse;
22use api::v1::alter_table_expr::Kind;
23use api::v1::RenameTable;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
27use common_procedure::{
28    Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
29    PoisonKeys, Procedure, ProcedureId, Status, StringKey,
30};
31use common_telemetry::{error, info, warn};
32use serde::{Deserialize, Serialize};
33use snafu::{ensure, ResultExt};
34use store_api::metadata::ColumnMetadata;
35use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
36use strum::AsRefStr;
37use table::metadata::{RawTableInfo, TableId, TableInfo};
38use table::table_reference::TableReference;
39
40use crate::ddl::alter_table::executor::AlterTableExecutor;
41use crate::ddl::utils::{
42    extract_column_metadatas, handle_multiple_results, map_to_procedure_error,
43    sync_follower_regions, MultipleResults,
44};
45use crate::ddl::DdlContext;
46use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu};
47use crate::key::table_info::TableInfoValue;
48use crate::key::{DeserializedValueWithBytes, RegionDistribution};
49use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
50use crate::metrics;
51use crate::poison_key::table_poison_key;
52use crate::rpc::ddl::AlterTableTask;
53use crate::rpc::router::{find_leaders, region_distribution, RegionRoute};
54
55/// The alter table procedure
56pub struct AlterTableProcedure {
57    /// The runtime context.
58    context: DdlContext,
59    /// The serialized data.
60    data: AlterTableData,
61    /// Cached new table metadata in the prepare step.
62    /// If we recover the procedure from json, then the table info value is not cached.
63    /// But we already validated it in the prepare step.
64    new_table_info: Option<TableInfo>,
65    /// The alter table executor.
66    executor: AlterTableExecutor,
67}
68
69/// Builds the executor from the [`AlterTableData`].
70///
71/// # Panics
72/// - If the alter kind is not set.
73fn build_executor_from_alter_expr(alter_data: &AlterTableData) -> AlterTableExecutor {
74    let table_name = alter_data.table_ref().into();
75    let table_id = alter_data.table_id;
76    let alter_kind = alter_data.task.alter_table.kind.as_ref().unwrap();
77    let new_table_name = if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
78        Some(new_table_name.to_string())
79    } else {
80        None
81    };
82    AlterTableExecutor::new(table_name, table_id, new_table_name)
83}
84
85impl AlterTableProcedure {
86    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
87
88    pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result<Self> {
89        task.validate()?;
90        let data = AlterTableData::new(task, table_id);
91        let executor = build_executor_from_alter_expr(&data);
92        Ok(Self {
93            context,
94            data,
95            new_table_info: None,
96            executor,
97        })
98    }
99
100    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
101        let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
102        let executor = build_executor_from_alter_expr(&data);
103
104        Ok(AlterTableProcedure {
105            context,
106            data,
107            new_table_info: None,
108            executor,
109        })
110    }
111
112    // Checks whether the table exists.
113    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
114        self.executor
115            .on_prepare(&self.context.table_metadata_manager)
116            .await?;
117        self.fill_table_info().await?;
118
119        // Safety: filled in `fill_table_info`.
120        let table_info_value = self.data.table_info_value.as_ref().unwrap();
121        let new_table_info = AlterTableExecutor::validate_alter_table_expr(
122            &table_info_value.table_info,
123            self.data.task.alter_table.clone(),
124        )?;
125        self.new_table_info = Some(new_table_info);
126
127        // Safety: Checked in `AlterTableProcedure::new`.
128        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
129        if matches!(alter_kind, Kind::RenameTable { .. }) {
130            self.data.state = AlterTableState::UpdateMetadata;
131        } else {
132            self.data.state = AlterTableState::SubmitAlterRegionRequests;
133        };
134        Ok(Status::executing(true))
135    }
136
137    fn table_poison_key(&self) -> PoisonKey {
138        table_poison_key(self.data.table_id())
139    }
140
141    async fn put_poison(
142        &self,
143        ctx_provider: &dyn ContextProvider,
144        procedure_id: ProcedureId,
145    ) -> Result<()> {
146        let poison_key = self.table_poison_key();
147        ctx_provider
148            .try_put_poison(&poison_key, procedure_id)
149            .await
150            .context(PutPoisonSnafu)
151    }
152
153    pub async fn submit_alter_region_requests(
154        &mut self,
155        procedure_id: ProcedureId,
156        ctx_provider: &dyn ContextProvider,
157    ) -> Result<Status> {
158        let table_id = self.data.table_id();
159        let (_, physical_table_route) = self
160            .context
161            .table_metadata_manager
162            .table_route_manager()
163            .get_physical_table_route(table_id)
164            .await?;
165
166        self.data.region_distribution =
167            Some(region_distribution(&physical_table_route.region_routes));
168        let leaders = find_leaders(&physical_table_route.region_routes);
169        let alter_kind = self.make_region_alter_kind()?;
170
171        info!(
172            "Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
173            self.data.table_ref(),
174            table_id,
175            alter_kind,
176        );
177
178        ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
179        // Puts the poison before submitting alter region requests to datanodes.
180        self.put_poison(ctx_provider, procedure_id).await?;
181        let results = self
182            .executor
183            .on_alter_regions(
184                &self.context.node_manager,
185                &physical_table_route.region_routes,
186                alter_kind,
187            )
188            .await;
189
190        match handle_multiple_results(results) {
191            MultipleResults::PartialRetryable(error) => {
192                // Just returns the error, and wait for the next try.
193                Err(error)
194            }
195            MultipleResults::PartialNonRetryable(error) => {
196                error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
197                // No retry will be done.
198                Ok(Status::poisoned(
199                    Some(self.table_poison_key()),
200                    ProcedureError::external(error),
201                ))
202            }
203            MultipleResults::AllRetryable(error) => {
204                // Just returns the error, and wait for the next try.
205                let err = BoxedError::new(error);
206                Err(err).context(RetryLaterSnafu {
207                    clean_poisons: true,
208                })
209            }
210            MultipleResults::Ok(results) => {
211                self.submit_sync_region_requests(&results, &physical_table_route.region_routes)
212                    .await;
213                self.handle_alter_region_response(results)?;
214                Ok(Status::executing_with_clean_poisons(true))
215            }
216            MultipleResults::AllNonRetryable(error) => {
217                error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
218                // It assumes the metadata on datanode is not changed.
219                // Case: The alter region request is sent but not applied. (e.g., InvalidArgument)
220
221                let err = BoxedError::new(error);
222                Err(err).context(AbortProcedureSnafu {
223                    clean_poisons: true,
224                })
225            }
226        }
227    }
228
229    fn handle_alter_region_response(&mut self, mut results: Vec<RegionResponse>) -> Result<()> {
230        self.data.state = AlterTableState::UpdateMetadata;
231        if let Some(column_metadatas) =
232            extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
233        {
234            self.data.column_metadatas = column_metadatas;
235        } else {
236            warn!("altering table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged");
237        }
238
239        Ok(())
240    }
241
242    async fn submit_sync_region_requests(
243        &mut self,
244        results: &[RegionResponse],
245        region_routes: &[RegionRoute],
246    ) {
247        // Safety: filled in `prepare` step.
248        let table_info = self.data.table_info().unwrap();
249        if let Err(err) = sync_follower_regions(
250            &self.context,
251            self.data.table_id(),
252            results,
253            region_routes,
254            table_info.meta.engine.as_str(),
255        )
256        .await
257        {
258            error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
259        }
260    }
261
262    /// Update table metadata.
263    pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
264        let table_id = self.data.table_id();
265        let table_ref = self.data.table_ref();
266        // Safety: filled in `fill_table_info`.
267        let table_info_value = self.data.table_info_value.as_ref().unwrap();
268        // Safety: Checked in `AlterTableProcedure::new`.
269        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
270
271        // Gets the table info from the cache or builds it.
272        let  new_info = match &self.new_table_info {
273            Some(cached) => cached.clone(),
274            None => AlterTableExecutor::validate_alter_table_expr(
275                &table_info_value.table_info,
276                self.data.task.alter_table.clone(),
277               )
278                .inspect_err(|e| {
279                    // We already check the table info in the prepare step so this should not happen.
280                    error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
281                })?,
282        };
283
284        // Safety: region distribution is set in `submit_alter_region_requests`.
285        self.executor
286            .on_alter_metadata(
287                &self.context.table_metadata_manager,
288                table_info_value,
289                self.data.region_distribution.as_ref(),
290                new_info.into(),
291                &self.data.column_metadatas,
292            )
293            .await?;
294
295        info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
296        self.data.state = AlterTableState::InvalidateTableCache;
297        Ok(Status::executing(true))
298    }
299
300    /// Broadcasts the invalidating table cache instructions.
301    async fn on_broadcast(&mut self) -> Result<Status> {
302        self.executor
303            .invalidate_table_cache(&self.context.cache_invalidator)
304            .await?;
305        Ok(Status::done())
306    }
307
308    fn lock_key_inner(&self) -> Vec<StringKey> {
309        let mut lock_key = vec![];
310        let table_ref = self.data.table_ref();
311        let table_id = self.data.table_id();
312        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
313        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
314        lock_key.push(TableLock::Write(table_id).into());
315
316        // Safety: Checked in `AlterTableProcedure::new`.
317        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
318        if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
319            lock_key.push(
320                TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
321            )
322        }
323
324        lock_key
325    }
326
327    #[cfg(test)]
328    pub(crate) fn data(&self) -> &AlterTableData {
329        &self.data
330    }
331
332    #[cfg(test)]
333    pub(crate) fn mut_data(&mut self) -> &mut AlterTableData {
334        &mut self.data
335    }
336}
337
338#[async_trait]
339impl Procedure for AlterTableProcedure {
340    fn type_name(&self) -> &str {
341        Self::TYPE_NAME
342    }
343
344    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
345        let state = &self.data.state;
346
347        let step = state.as_ref();
348
349        let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
350            .with_label_values(&[step])
351            .start_timer();
352
353        match state {
354            AlterTableState::Prepare => self.on_prepare().await,
355            AlterTableState::SubmitAlterRegionRequests => {
356                self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
357                    .await
358            }
359            AlterTableState::UpdateMetadata => self.on_update_metadata().await,
360            AlterTableState::InvalidateTableCache => self.on_broadcast().await,
361        }
362        .map_err(map_to_procedure_error)
363    }
364
365    fn dump(&self) -> ProcedureResult<String> {
366        serde_json::to_string(&self.data).context(ToJsonSnafu)
367    }
368
369    fn lock_key(&self) -> LockKey {
370        let key = self.lock_key_inner();
371
372        LockKey::new(key)
373    }
374
375    fn poison_keys(&self) -> PoisonKeys {
376        PoisonKeys::new(vec![self.table_poison_key()])
377    }
378}
379
380#[derive(Debug, Serialize, Deserialize, AsRefStr)]
381enum AlterTableState {
382    /// Prepares to alter the table.
383    Prepare,
384    /// Sends alter region requests to Datanode.
385    SubmitAlterRegionRequests,
386    /// Updates table metadata.
387    UpdateMetadata,
388    /// Broadcasts the invalidating table cache instruction.
389    InvalidateTableCache,
390}
391
392// The serialized data of alter table.
393#[derive(Debug, Serialize, Deserialize)]
394pub struct AlterTableData {
395    state: AlterTableState,
396    task: AlterTableTask,
397    table_id: TableId,
398    #[serde(default)]
399    column_metadatas: Vec<ColumnMetadata>,
400    /// Table info value before alteration.
401    table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
402    /// Region distribution for table in case we need to update region options.
403    region_distribution: Option<RegionDistribution>,
404}
405
406impl AlterTableData {
407    pub fn new(task: AlterTableTask, table_id: TableId) -> Self {
408        Self {
409            state: AlterTableState::Prepare,
410            task,
411            table_id,
412            column_metadatas: vec![],
413            table_info_value: None,
414            region_distribution: None,
415        }
416    }
417
418    fn table_ref(&self) -> TableReference {
419        self.task.table_ref()
420    }
421
422    fn table_id(&self) -> TableId {
423        self.table_id
424    }
425
426    fn table_info(&self) -> Option<&RawTableInfo> {
427        self.table_info_value
428            .as_ref()
429            .map(|value| &value.table_info)
430    }
431
432    #[cfg(test)]
433    pub(crate) fn column_metadatas(&self) -> &[ColumnMetadata] {
434        &self.column_metadatas
435    }
436
437    #[cfg(test)]
438    pub(crate) fn set_column_metadatas(&mut self, column_metadatas: Vec<ColumnMetadata>) {
439        self.column_metadatas = column_metadatas;
440    }
441}