common_meta/ddl/
alter_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod check;
16mod metadata;
17mod region_request;
18mod update_metadata;
19
20use std::vec;
21
22use api::region::RegionResponse;
23use api::v1::alter_table_expr::Kind;
24use api::v1::RenameTable;
25use async_trait::async_trait;
26use common_error::ext::BoxedError;
27use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
28use common_procedure::{
29    Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
30    PoisonKeys, Procedure, ProcedureId, Status, StringKey,
31};
32use common_telemetry::{debug, error, info};
33use futures::future::{self};
34use serde::{Deserialize, Serialize};
35use snafu::{ensure, ResultExt};
36use store_api::storage::RegionId;
37use strum::AsRefStr;
38use table::metadata::{RawTableInfo, TableId, TableInfo};
39use table::table_reference::TableReference;
40
41use crate::cache_invalidator::Context;
42use crate::ddl::utils::{
43    add_peer_context_if_needed, handle_multiple_results, sync_follower_regions, MultipleResults,
44};
45use crate::ddl::DdlContext;
46use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result};
47use crate::instruction::CacheIdent;
48use crate::key::table_info::TableInfoValue;
49use crate::key::{DeserializedValueWithBytes, RegionDistribution};
50use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
51use crate::metrics;
52use crate::poison_key::table_poison_key;
53use crate::rpc::ddl::AlterTableTask;
54use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution, RegionRoute};
55
56/// The alter table procedure
57pub struct AlterTableProcedure {
58    /// The runtime context.
59    context: DdlContext,
60    /// The serialized data.
61    data: AlterTableData,
62    /// Cached new table metadata in the prepare step.
63    /// If we recover the procedure from json, then the table info value is not cached.
64    /// But we already validated it in the prepare step.
65    new_table_info: Option<TableInfo>,
66}
67
68impl AlterTableProcedure {
69    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
70
71    pub fn new(table_id: TableId, task: AlterTableTask, context: DdlContext) -> Result<Self> {
72        task.validate()?;
73        Ok(Self {
74            context,
75            data: AlterTableData::new(task, table_id),
76            new_table_info: None,
77        })
78    }
79
80    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
81        let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
82        Ok(AlterTableProcedure {
83            context,
84            data,
85            new_table_info: None,
86        })
87    }
88
89    // Checks whether the table exists.
90    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
91        self.check_alter().await?;
92        self.fill_table_info().await?;
93
94        // Validates the request and builds the new table info.
95        // We need to build the new table info here because we should ensure the alteration
96        // is valid in `UpdateMeta` state as we already altered the region.
97        // Safety: `fill_table_info()` already set it.
98        let table_info_value = self.data.table_info_value.as_ref().unwrap();
99        self.new_table_info = Some(self.build_new_table_info(&table_info_value.table_info)?);
100
101        // Safety: Checked in `AlterTableProcedure::new`.
102        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
103        if matches!(alter_kind, Kind::RenameTable { .. }) {
104            self.data.state = AlterTableState::UpdateMetadata;
105        } else {
106            self.data.state = AlterTableState::SubmitAlterRegionRequests;
107        };
108        Ok(Status::executing(true))
109    }
110
111    fn table_poison_key(&self) -> PoisonKey {
112        table_poison_key(self.data.table_id())
113    }
114
115    async fn put_poison(
116        &self,
117        ctx_provider: &dyn ContextProvider,
118        procedure_id: ProcedureId,
119    ) -> Result<()> {
120        let poison_key = self.table_poison_key();
121        ctx_provider
122            .try_put_poison(&poison_key, procedure_id)
123            .await
124            .context(PutPoisonSnafu)
125    }
126
127    pub async fn submit_alter_region_requests(
128        &mut self,
129        procedure_id: ProcedureId,
130        ctx_provider: &dyn ContextProvider,
131    ) -> Result<Status> {
132        let table_id = self.data.table_id();
133        let (_, physical_table_route) = self
134            .context
135            .table_metadata_manager
136            .table_route_manager()
137            .get_physical_table_route(table_id)
138            .await?;
139
140        self.data.region_distribution =
141            Some(region_distribution(&physical_table_route.region_routes));
142
143        let leaders = find_leaders(&physical_table_route.region_routes);
144        let mut alter_region_tasks = Vec::with_capacity(leaders.len());
145        let alter_kind = self.make_region_alter_kind()?;
146
147        info!(
148            "Submitting alter region requests for table {}, table_id: {}, alter_kind: {:?}",
149            self.data.table_ref(),
150            table_id,
151            alter_kind,
152        );
153
154        ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
155        // Puts the poison before submitting alter region requests to datanodes.
156        self.put_poison(ctx_provider, procedure_id).await?;
157        for datanode in leaders {
158            let requester = self.context.node_manager.datanode(&datanode).await;
159            let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);
160
161            for region in regions {
162                let region_id = RegionId::new(table_id, region);
163                let request = self.make_alter_region_request(region_id, alter_kind.clone())?;
164                debug!("Submitting {request:?} to {datanode}");
165
166                let datanode = datanode.clone();
167                let requester = requester.clone();
168
169                alter_region_tasks.push(async move {
170                    requester
171                        .handle(request)
172                        .await
173                        .map_err(add_peer_context_if_needed(datanode))
174                });
175            }
176        }
177
178        let results = future::join_all(alter_region_tasks)
179            .await
180            .into_iter()
181            .collect::<Vec<_>>();
182
183        match handle_multiple_results(results) {
184            MultipleResults::PartialRetryable(error) => {
185                // Just returns the error, and wait for the next try.
186                Err(error)
187            }
188            MultipleResults::PartialNonRetryable(error) => {
189                error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
190                // No retry will be done.
191                Ok(Status::poisoned(
192                    Some(self.table_poison_key()),
193                    ProcedureError::external(error),
194                ))
195            }
196            MultipleResults::AllRetryable(error) => {
197                // Just returns the error, and wait for the next try.
198                Err(error)
199            }
200            MultipleResults::Ok(results) => {
201                self.submit_sync_region_requests(results, &physical_table_route.region_routes)
202                    .await;
203                self.data.state = AlterTableState::UpdateMetadata;
204                Ok(Status::executing_with_clean_poisons(true))
205            }
206            MultipleResults::AllNonRetryable(error) => {
207                error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
208                // It assumes the metadata on datanode is not changed.
209                // Case: The alter region request is sent but not applied. (e.g., InvalidArgument)
210
211                let err = BoxedError::new(error);
212                Err(err).context(AbortProcedureSnafu {
213                    clean_poisons: true,
214                })
215            }
216        }
217    }
218
219    async fn submit_sync_region_requests(
220        &mut self,
221        results: Vec<RegionResponse>,
222        region_routes: &[RegionRoute],
223    ) {
224        // Safety: filled in `prepare` step.
225        let table_info = self.data.table_info().unwrap();
226        if let Err(err) = sync_follower_regions(
227            &self.context,
228            self.data.table_id(),
229            results,
230            region_routes,
231            table_info.meta.engine.as_str(),
232        )
233        .await
234        {
235            error!(err; "Failed to sync regions for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
236        }
237    }
238
239    /// Update table metadata.
240    pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
241        let table_id = self.data.table_id();
242        let table_ref = self.data.table_ref();
243        // Safety: checked before.
244        let table_info_value = self.data.table_info_value.as_ref().unwrap();
245        // Gets the table info from the cache or builds it.
246        let new_info = match &self.new_table_info {
247            Some(cached) => cached.clone(),
248            None => self.build_new_table_info(&table_info_value.table_info)
249                .inspect_err(|e| {
250                    // We already check the table info in the prepare step so this should not happen.
251                    error!(e; "Unable to build info for table {} in update metadata step, table_id: {}", table_ref, table_id);
252                })?,
253        };
254
255        debug!(
256            "Starting update table: {} metadata, new table info {:?}",
257            table_ref.to_string(),
258            new_info
259        );
260
261        // Safety: Checked in `AlterTableProcedure::new`.
262        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
263        if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
264            self.on_update_metadata_for_rename(new_table_name.to_string(), table_info_value)
265                .await?;
266        } else {
267            // region distribution is set in submit_alter_region_requests
268            let region_distribution = self.data.region_distribution.as_ref().unwrap().clone();
269            self.on_update_metadata_for_alter(
270                new_info.into(),
271                region_distribution,
272                table_info_value,
273            )
274            .await?;
275        }
276
277        info!("Updated table metadata for table {table_ref}, table_id: {table_id}, kind: {alter_kind:?}");
278        self.data.state = AlterTableState::InvalidateTableCache;
279        Ok(Status::executing(true))
280    }
281
282    /// Broadcasts the invalidating table cache instructions.
283    async fn on_broadcast(&mut self) -> Result<Status> {
284        let cache_invalidator = &self.context.cache_invalidator;
285
286        cache_invalidator
287            .invalidate(
288                &Context::default(),
289                &[
290                    CacheIdent::TableId(self.data.table_id()),
291                    CacheIdent::TableName(self.data.table_ref().into()),
292                ],
293            )
294            .await?;
295
296        Ok(Status::done())
297    }
298
299    fn lock_key_inner(&self) -> Vec<StringKey> {
300        let mut lock_key = vec![];
301        let table_ref = self.data.table_ref();
302        let table_id = self.data.table_id();
303        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
304        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
305        lock_key.push(TableLock::Write(table_id).into());
306
307        // Safety: Checked in `AlterTableProcedure::new`.
308        let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap();
309        if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
310            lock_key.push(
311                TableNameLock::new(table_ref.catalog, table_ref.schema, new_table_name).into(),
312            )
313        }
314
315        lock_key
316    }
317}
318
319#[async_trait]
320impl Procedure for AlterTableProcedure {
321    fn type_name(&self) -> &str {
322        Self::TYPE_NAME
323    }
324
325    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
326        let error_handler = |e: Error| {
327            if e.is_retry_later() {
328                ProcedureError::retry_later(e)
329            } else if e.need_clean_poisons() {
330                ProcedureError::external_and_clean_poisons(e)
331            } else {
332                ProcedureError::external(e)
333            }
334        };
335
336        let state = &self.data.state;
337
338        let step = state.as_ref();
339
340        let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
341            .with_label_values(&[step])
342            .start_timer();
343
344        match state {
345            AlterTableState::Prepare => self.on_prepare().await,
346            AlterTableState::SubmitAlterRegionRequests => {
347                self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
348                    .await
349            }
350            AlterTableState::UpdateMetadata => self.on_update_metadata().await,
351            AlterTableState::InvalidateTableCache => self.on_broadcast().await,
352        }
353        .map_err(error_handler)
354    }
355
356    fn dump(&self) -> ProcedureResult<String> {
357        serde_json::to_string(&self.data).context(ToJsonSnafu)
358    }
359
360    fn lock_key(&self) -> LockKey {
361        let key = self.lock_key_inner();
362
363        LockKey::new(key)
364    }
365
366    fn poison_keys(&self) -> PoisonKeys {
367        PoisonKeys::new(vec![self.table_poison_key()])
368    }
369}
370
371#[derive(Debug, Serialize, Deserialize, AsRefStr)]
372enum AlterTableState {
373    /// Prepares to alter the table.
374    Prepare,
375    /// Sends alter region requests to Datanode.
376    SubmitAlterRegionRequests,
377    /// Updates table metadata.
378    UpdateMetadata,
379    /// Broadcasts the invalidating table cache instruction.
380    InvalidateTableCache,
381}
382
383// The serialized data of alter table.
384#[derive(Debug, Serialize, Deserialize)]
385pub struct AlterTableData {
386    state: AlterTableState,
387    task: AlterTableTask,
388    table_id: TableId,
389    /// Table info value before alteration.
390    table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
391    /// Region distribution for table in case we need to update region options.
392    region_distribution: Option<RegionDistribution>,
393}
394
395impl AlterTableData {
396    pub fn new(task: AlterTableTask, table_id: TableId) -> Self {
397        Self {
398            state: AlterTableState::Prepare,
399            task,
400            table_id,
401            table_info_value: None,
402            region_distribution: None,
403        }
404    }
405
406    fn table_ref(&self) -> TableReference {
407        self.task.table_ref()
408    }
409
410    fn table_id(&self) -> TableId {
411        self.table_id
412    }
413
414    fn table_info(&self) -> Option<&RawTableInfo> {
415        self.table_info_value
416            .as_ref()
417            .map(|value| &value.table_info)
418    }
419}