common_meta/ddl/
alter_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use std::vec;
21
22use api::region::RegionResponse;
23use api::v1::alter_table_expr::Kind;
24use api::v1::RenameTable;
25use async_trait::async_trait;
26use common_error::ext::BoxedError;
27use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
28use common_procedure::{
29    Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
30    PoisonKeys, Procedure, ProcedureId, Status, StringKey,
31};
32use common_telemetry::{debug, error, info};
33use futures::future::{self};
34use serde::{Deserialize, Serialize};
35use snafu::{ensure, ResultExt};
36use store_api::storage::RegionId;
37use strum::AsRefStr;
38use table::metadata::{RawTableInfo, TableId, TableInfo};
39use table::table_reference::TableReference;
40
41use crate::cache_invalidator::Context;
42use crate::ddl::utils::{
43    add_peer_context_if_needed, handle_multiple_results, map_to_procedure_error,
44    sync_follower_regions, MultipleResults,
45};
46use crate::ddl::DdlContext;
47use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu};
48use crate::instruction::CacheIdent;
49use crate::key::table_info::TableInfoValue;
50use crate::key::{DeserializedValueWithBytes, RegionDistribution};
51use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
52use crate::metrics;
53use crate::poison_key::table_poison_key;
54use crate::rpc::ddl::AlterTableTask;
55use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution, RegionRoute};
56
57/// The alter table procedure
58pub struct AlterTableProcedure {
59    /// The runtime context.
60    context: DdlContext,
61    /// The serialized data.
62    data: AlterTableData,
63    /// Cached new table metadata in the prepare step.
64    /// If we recover the procedure from json, then the table info value is not cached.
65    /// But we already validated it in the prepare step.
66    new_table_info: Option<TableInfo>,
67}
68
69impl AlterTableProcedure {
70    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
71
72    pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result<Self> {
73        task.validate()?;
74        Ok(Self {
75            context,
76            data: AlterTableData::new(task, table_id),
77            new_table_info: None,
78        })
79    }
80
81    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
82        let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
83        Ok(AlterTableProcedure {
84            context,
85            data,
86            new_table_info: None,
87        })
88    }
89
90    // Checks whether the table exists.
91    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
92        self.check_alter().await?;
93        self.fill_table_info().await?;
94
95        // Validates the request and builds the new table info.
96        // We need to build the new table info here because we should ensure the alteration
97        // is valid in `UpdateMeta` state as we already altered the region.
98        // Safety: `fill_table_info()` already set it.
99        let table_info_value = self.data.table_info_value.as_ref().unwrap();
100        self.new_table_info = Some(self.build_new_table_info(&table_info_value.table_info)?);
101
102        // Safety: Checked in `AlterTableProcedure::new`.
103        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
104        if matches!(alter_kind, Kind::RenameTable { .. }) {
105            self.data.state = AlterTableState::UpdateMetadata;
106        } else {
107            self.data.state = AlterTableState::SubmitAlterRegionRequests;
108        };
109        Ok(Status::executing(true))
110    }
111
112    fn table_poison_key(&self) -> PoisonKey {
113        table_poison_key(self.data.table_id())
114    }
115
116    async fn put_poison(
117        &self,
118        ctx_provider: &dyn ContextProvider,
119        procedure_id: ProcedureId,
120    ) -> Result<()> {
121        let poison_key = self.table_poison_key();
122        ctx_provider
123            .try_put_poison(&poison_key, procedure_id)
124            .await
125            .context(PutPoisonSnafu)
126    }
127
128    pub async fn submit_alter_region_requests(
129        &mut self,
130        procedure_id: ProcedureId,
131        ctx_provider: &dyn ContextProvider,
132    ) -> Result<Status> {
133        let table_id = self.data.table_id();
134        let (_, physical_table_route) = self
135            .context
136            .table_metadata_manager
137            .table_route_manager()
138            .get_physical_table_route(table_id)
139            .await?;
140
141        self.data.region_distribution =
142            Some(region_distribution(&physical_table_route.region_routes));
143
144        let leaders = find_leaders(&physical_table_route.region_routes);
145        let mut alter_region_tasks = Vec::with_capacity(leaders.len());
146        let alter_kind = self.make_region_alter_kind()?;
147
148        info!(
149            "Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
150            self.data.table_ref(),
151            table_id,
152            alter_kind,
153        );
154
155        ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
156        // Puts the poison before submitting alter region requests to datanodes.
157        self.put_poison(ctx_provider, procedure_id).await?;
158        for datanode in leaders {
159            let requester = self.context.node_manager.datanode(&datanode).await;
160            let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);
161
162            for region in regions {
163                let region_id = RegionId::new(table_id, region);
164                let request = self.make_alter_region_request(region_id, alter_kind.clone())?;
165                debug!("Submitting {request:?} to {datanode}");
166
167                let datanode = datanode.clone();
168                let requester = requester.clone();
169
170                alter_region_tasks.push(async move {
171                    requester
172                        .handle(request)
173                        .await
174                        .map_err(add_peer_context_if_needed(datanode))
175                });
176            }
177        }
178
179        let results = future::join_all(alter_region_tasks)
180            .await
181            .into_iter()
182            .collect::<Vec<_>>();
183
184        match handle_multiple_results(results) {
185            MultipleResults::PartialRetryable(error) => {
186                // Just returns the error, and wait for the next try.
187                Err(error)
188            }
189            MultipleResults::PartialNonRetryable(error) => {
190                error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
191                // No retry will be done.
192                Ok(Status::poisoned(
193                    Some(self.table_poison_key()),
194                    ProcedureError::external(error),
195                ))
196            }
197            MultipleResults::AllRetryable(error) => {
198                // Just returns the error, and wait for the next try.
199                let err = BoxedError::new(error);
200                Err(err).context(RetryLaterSnafu {
201                    clean_poisons: true,
202                })
203            }
204            MultipleResults::Ok(results) => {
205                self.submit_sync_region_requests(results, &physical_table_route.region_routes)
206                    .await;
207                self.data.state = AlterTableState::UpdateMetadata;
208                Ok(Status::executing_with_clean_poisons(true))
209            }
210            MultipleResults::AllNonRetryable(error) => {
211                error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
212                // It assumes the metadata on datanode is not changed.
213                // Case: The alter region request is sent but not applied. (e.g., InvalidArgument)
214
215                let err = BoxedError::new(error);
216                Err(err).context(AbortProcedureSnafu {
217                    clean_poisons: true,
218                })
219            }
220        }
221    }
222
223    async fn submit_sync_region_requests(
224        &mut self,
225        results: Vec<RegionResponse>,
226        region_routes: &[RegionRoute],
227    ) {
228        // Safety: filled in `prepare` step.
229        let table_info = self.data.table_info().unwrap();
230        if let Err(err) = sync_follower_regions(
231            &self.context,
232            self.data.table_id(),
233            results,
234            region_routes,
235            table_info.meta.engine.as_str(),
236        )
237        .await
238        {
239            error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
240        }
241    }
242
243    /// Update table metadata.
244    pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
245        let table_id = self.data.table_id();
246        let table_ref = self.data.table_ref();
247        // Safety: checked before.
248        let table_info_value = self.data.table_info_value.as_ref().unwrap();
249        // Gets the table info from the cache or builds it.
250        let new_info = match &self.new_table_info {
251            Some(cached) => cached.clone(),
252            None => self.build_new_table_info(&table_info_value.table_info)
253                .inspect_err(|e| {
254                    // We already check the table info in the prepare step so this should not happen.
255                    error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
256                })?,
257        };
258
259        debug!(
260            "Starting update table: {} metadata, new table info {:?}",
261            table_ref.to_string(),
262            new_info
263        );
264
265        // Safety: Checked in `AlterTableProcedure::new`.
266        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
267        if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
268            self.on_update_metadata_for_rename(new_table_name.to_string(), table_info_value)
269                .await?;
270        } else {
271            // region distribution is set in submit_alter_region_requests
272            let region_distribution = self.data.region_distribution.as_ref().unwrap().clone();
273            self.on_update_metadata_for_alter(
274                new_info.into(),
275                region_distribution,
276                table_info_value,
277            )
278            .await?;
279        }
280
281        info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
282        self.data.state = AlterTableState::InvalidateTableCache;
283        Ok(Status::executing(true))
284    }
285
286    /// Broadcasts the invalidating table cache instructions.
287    async fn on_broadcast(&mut self) -> Result<Status> {
288        let cache_invalidator = &self.context.cache_invalidator;
289
290        cache_invalidator
291            .invalidate(
292                &Context::default(),
293                &[
294                    CacheIdent::TableId(self.data.table_id()),
295                    CacheIdent::TableName(self.data.table_ref().into()),
296                ],
297            )
298            .await?;
299
300        Ok(Status::done())
301    }
302
303    fn lock_key_inner(&self) -> Vec<StringKey> {
304        let mut lock_key = vec![];
305        let table_ref = self.data.table_ref();
306        let table_id = self.data.table_id();
307        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
308        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
309        lock_key.push(TableLock::Write(table_id).into());
310
311        // Safety: Checked in `AlterTableProcedure::new`.
312        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
313        if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
314            lock_key.push(
315                TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
316            )
317        }
318
319        lock_key
320    }
321}
322
323#[async_trait]
324impl Procedure for AlterTableProcedure {
325    fn type_name(&self) -> &str {
326        Self::TYPE_NAME
327    }
328
329    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
330        let state = &self.data.state;
331
332        let step = state.as_ref();
333
334        let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
335            .with_label_values(&[step])
336            .start_timer();
337
338        match state {
339            AlterTableState::Prepare => self.on_prepare().await,
340            AlterTableState::SubmitAlterRegionRequests => {
341                self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
342                    .await
343            }
344            AlterTableState::UpdateMetadata => self.on_update_metadata().await,
345            AlterTableState::InvalidateTableCache => self.on_broadcast().await,
346        }
347        .map_err(map_to_procedure_error)
348    }
349
350    fn dump(&self) -> ProcedureResult<String> {
351        serde_json::to_string(&self.data).context(ToJsonSnafu)
352    }
353
354    fn lock_key(&self) -> LockKey {
355        let key = self.lock_key_inner();
356
357        LockKey::new(key)
358    }
359
360    fn poison_keys(&self) -> PoisonKeys {
361        PoisonKeys::new(vec![self.table_poison_key()])
362    }
363}
364
365#[derive(Debug, Serialize, Deserialize, AsRefStr)]
366enum AlterTableState {
367    /// Prepares to alter the table.
368    Prepare,
369    /// Sends alter region requests to Datanode.
370    SubmitAlterRegionRequests,
371    /// Updates table metadata.
372    UpdateMetadata,
373    /// Broadcasts the invalidating table cache instruction.
374    InvalidateTableCache,
375}
376
377// The serialized data of alter table.
378#[derive(Debug, Serialize, Deserialize)]
379pub struct AlterTableData {
380    state: AlterTableState,
381    task: AlterTableTask,
382    table_id: TableId,
383    /// Table info value before alteration.
384    table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
385    /// Region distribution for table in case we need to update region options.
386    region_distribution: Option<RegionDistribution>,
387}
388
389impl AlterTableData {
390    pub fn new(task: AlterTableTask, table_id: TableId) -> Self {
391        Self {
392            state: AlterTableState::Prepare,
393            task,
394            table_id,
395            table_info_value: None,
396            region_distribution: None,
397        }
398    }
399
400    fn table_ref(&self) -> TableReference {
401        self.task.table_ref()
402    }
403
404    fn table_id(&self) -> TableId {
405        self.table_id
406    }
407
408    fn table_info(&self) -> Option<&RawTableInfo> {
409        self.table_info_value
410            .as_ref()
411            .map(|value| &value.table_info)
412    }
413}