common_meta/ddl/
alter_logical_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod executor;
16mod update_metadata;
17mod validator;
18
19use api::region::RegionResponse;
20use async_trait::async_trait;
21use common_catalog::format_full_table_name;
22use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
23use common_procedure::{Context, LockKey, Procedure, Status};
24use common_telemetry::{debug, error, info, warn};
25pub use executor::make_alter_region_request;
26use serde::{Deserialize, Serialize};
27use snafu::ResultExt;
28use store_api::metadata::ColumnMetadata;
29use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
30use strum::AsRefStr;
31use table::metadata::TableId;
32
33use crate::cache_invalidator::Context as CacheContext;
34use crate::ddl::DdlContext;
35use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
36use crate::ddl::alter_logical_tables::validator::{
37    AlterLogicalTableValidator, ValidatorResult, retain_unskipped,
38};
39use crate::ddl::utils::{extract_column_metadatas, map_to_procedure_error, sync_follower_regions};
40use crate::error::Result;
41use crate::instruction::CacheIdent;
42use crate::key::DeserializedValueWithBytes;
43use crate::key::table_info::TableInfoValue;
44use crate::key::table_route::PhysicalTableRouteValue;
45use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
46use crate::metrics;
47use crate::rpc::ddl::AlterTableTask;
48use crate::rpc::router::RegionRoute;
49
50pub struct AlterLogicalTablesProcedure {
51    pub context: DdlContext,
52    pub data: AlterTablesData,
53    /// Physical table route cache.
54    pub physical_table_route: Option<PhysicalTableRouteValue>,
55}
56
57/// Builds the validator from the [`AlterTablesData`].
58fn build_validator_from_alter_table_data<'a>(
59    data: &'a AlterTablesData,
60) -> AlterLogicalTableValidator<'a> {
61    let phsycial_table_id = data.physical_table_id;
62    let alters = data
63        .tasks
64        .iter()
65        .map(|task| &task.alter_table)
66        .collect::<Vec<_>>();
67    AlterLogicalTableValidator::new(phsycial_table_id, alters)
68}
69
70/// Builds the executor from the [`AlterTablesData`].
71fn build_executor_from_alter_expr<'a>(data: &'a AlterTablesData) -> AlterLogicalTablesExecutor<'a> {
72    debug_assert_eq!(data.tasks.len(), data.table_info_values.len());
73    let alters = data
74        .tasks
75        .iter()
76        .zip(data.table_info_values.iter())
77        .map(|(task, table_info)| (table_info.table_info.ident.table_id, &task.alter_table))
78        .collect::<Vec<_>>();
79    AlterLogicalTablesExecutor::new(alters)
80}
81
82impl AlterLogicalTablesProcedure {
83    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";
84
85    pub fn new(
86        tasks: Vec<AlterTableTask>,
87        physical_table_id: TableId,
88        context: DdlContext,
89    ) -> Self {
90        Self {
91            context,
92            data: AlterTablesData {
93                state: AlterTablesState::Prepare,
94                tasks,
95                table_info_values: vec![],
96                physical_table_id,
97                physical_table_info: None,
98                physical_columns: vec![],
99                table_cache_keys_to_invalidate: vec![],
100            },
101            physical_table_route: None,
102        }
103    }
104
105    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
106        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
107        Ok(Self {
108            context,
109            data,
110            physical_table_route: None,
111        })
112    }
113
114    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
115        let validator = build_validator_from_alter_table_data(&self.data);
116        let ValidatorResult {
117            num_skipped,
118            skip_alter,
119            table_info_values,
120            physical_table_info,
121            physical_table_route,
122        } = validator
123            .validate(&self.context.table_metadata_manager)
124            .await?;
125
126        let num_tasks = self.data.tasks.len();
127        if num_skipped == num_tasks {
128            info!("All the alter tasks are finished, will skip the procedure.");
129            let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys(
130                &physical_table_info,
131                &table_info_values
132                    .iter()
133                    .map(|v| v.get_inner_ref())
134                    .collect::<Vec<_>>(),
135            );
136            self.data.table_cache_keys_to_invalidate = cache_ident_keys;
137            // Re-invalidate the table cache
138            self.data.state = AlterTablesState::InvalidateTableCache;
139            return Ok(Status::executing(true));
140        } else if num_skipped > 0 {
141            info!(
142                "There are {} alter tasks, {} of them were already finished.",
143                num_tasks, num_skipped
144            );
145        }
146
147        // Updates the procedure state.
148        retain_unskipped(&mut self.data.tasks, &skip_alter);
149        self.data.physical_table_info = Some(physical_table_info);
150        self.data.table_info_values = table_info_values;
151        debug_assert_eq!(self.data.tasks.len(), self.data.table_info_values.len());
152        self.physical_table_route = Some(physical_table_route);
153        self.data.state = AlterTablesState::SubmitAlterRegionRequests;
154        Ok(Status::executing(true))
155    }
156
157    pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
158        self.fetch_physical_table_route_if_non_exist().await?;
159        // Safety: fetched in `fetch_physical_table_route_if_non_exist`.
160        let region_routes = &self.physical_table_route.as_ref().unwrap().region_routes;
161
162        let executor = build_executor_from_alter_expr(&self.data);
163        let mut results = executor
164            .on_alter_regions(
165                &self.context.node_manager,
166                // Avoid double-borrowing self by extracting the region_routes first
167                region_routes,
168            )
169            .await?;
170
171        if let Some(column_metadatas) =
172            extract_column_metadatas(&mut results, ALTER_PHYSICAL_EXTENSION_KEY)?
173        {
174            self.data.physical_columns = column_metadatas;
175        } else {
176            warn!(
177                "altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged"
178            );
179        }
180        self.submit_sync_region_requests(results, region_routes)
181            .await;
182        self.data.state = AlterTablesState::UpdateMetadata;
183        Ok(Status::executing(true))
184    }
185
186    async fn submit_sync_region_requests(
187        &self,
188        results: Vec<RegionResponse>,
189        region_routes: &[RegionRoute],
190    ) {
191        let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
192        if let Err(err) = sync_follower_regions(
193            &self.context,
194            self.data.physical_table_id,
195            &results,
196            region_routes,
197            table_info.meta.engine.as_str(),
198        )
199        .await
200        {
201            error!(err; "Failed to sync regions for table {}, table_id: {}",
202                        format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name),
203                        self.data.physical_table_id
204            );
205        }
206    }
207
208    pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
209        self.update_physical_table_metadata().await?;
210        self.update_logical_tables_metadata().await?;
211
212        let logical_table_info_values = self
213            .data
214            .table_info_values
215            .iter()
216            .map(|v| v.get_inner_ref())
217            .collect::<Vec<_>>();
218
219        let cache_ident_keys = AlterLogicalTablesExecutor::build_cache_ident_keys(
220            self.data.physical_table_info.as_ref().unwrap(),
221            &logical_table_info_values,
222        );
223        self.data.table_cache_keys_to_invalidate = cache_ident_keys;
224        self.data.clear_metadata_fields();
225
226        self.data.state = AlterTablesState::InvalidateTableCache;
227        Ok(Status::executing(true))
228    }
229
230    pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
231        let to_invalidate = &self.data.table_cache_keys_to_invalidate;
232
233        let ctx = CacheContext {
234            subject: Some(format!(
235                "Invalidate table cache by altering logical tables, physical_table_id: {}",
236                self.data.physical_table_id,
237            )),
238        };
239
240        self.context
241            .cache_invalidator
242            .invalidate(&ctx, to_invalidate)
243            .await?;
244        Ok(Status::done())
245    }
246
247    /// Fetches the physical table route if it is not already fetched.
248    async fn fetch_physical_table_route_if_non_exist(&mut self) -> Result<()> {
249        if self.physical_table_route.is_none() {
250            let (_, physical_table_route) = self
251                .context
252                .table_metadata_manager
253                .table_route_manager()
254                .get_physical_table_route(self.data.physical_table_id)
255                .await?;
256            self.physical_table_route = Some(physical_table_route);
257        }
258
259        Ok(())
260    }
261}
262
263#[async_trait]
264impl Procedure for AlterLogicalTablesProcedure {
265    fn type_name(&self) -> &str {
266        Self::TYPE_NAME
267    }
268
269    async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
270        let state = &self.data.state;
271
272        let step = state.as_ref();
273
274        let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
275            .with_label_values(&[step])
276            .start_timer();
277        debug!(
278            "Executing alter logical tables procedure, state: {:?}",
279            state
280        );
281
282        match state {
283            AlterTablesState::Prepare => self.on_prepare().await,
284            AlterTablesState::SubmitAlterRegionRequests => {
285                self.on_submit_alter_region_requests().await
286            }
287            AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
288            AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
289        }
290        .inspect_err(|_| {
291            // Reset the physical table route cache.
292            self.physical_table_route = None;
293        })
294        .map_err(map_to_procedure_error)
295    }
296
297    fn dump(&self) -> ProcedureResult<String> {
298        serde_json::to_string(&self.data).context(ToJsonSnafu)
299    }
300
301    fn lock_key(&self) -> LockKey {
302        // CatalogLock, SchemaLock,
303        // TableLock
304        // TableNameLock(s)
305        let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
306        let table_ref = self.data.tasks[0].table_ref();
307        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
308        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
309        lock_key.push(TableLock::Write(self.data.physical_table_id).into());
310        lock_key.extend(
311            self.data
312                .table_info_values
313                .iter()
314                .map(|table| TableLock::Write(table.table_info.ident.table_id).into()),
315        );
316
317        LockKey::new(lock_key)
318    }
319}
320
321#[derive(Debug, Serialize, Deserialize)]
322pub struct AlterTablesData {
323    state: AlterTablesState,
324    tasks: Vec<AlterTableTask>,
325    /// Table info values before the alter operation.
326    /// Corresponding one-to-one with the AlterTableTask in tasks.
327    table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
328    /// Physical table info
329    physical_table_id: TableId,
330    physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
331    physical_columns: Vec<ColumnMetadata>,
332    table_cache_keys_to_invalidate: Vec<CacheIdent>,
333}
334
335impl AlterTablesData {
336    /// Clears all data fields except `state` and `table_cache_keys_to_invalidate` after metadata update.
337    /// This is done to avoid persisting unnecessary data after the update metadata step.
338    fn clear_metadata_fields(&mut self) {
339        self.tasks.clear();
340        self.table_info_values.clear();
341        self.physical_table_id = 0;
342        self.physical_table_info = None;
343        self.physical_columns.clear();
344    }
345}
346
347#[derive(Debug, Serialize, Deserialize, AsRefStr)]
348enum AlterTablesState {
349    /// Prepares to alter the table
350    Prepare,
351    SubmitAlterRegionRequests,
352    /// Updates table metadata.
353    UpdateMetadata,
354    /// Broadcasts the invalidating table cache instruction.
355    InvalidateTableCache,
356}