common_meta/ddl/
alter_logical_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod check;
16mod metadata;
17mod region_request;
18mod table_cache_keys;
19mod update_metadata;
20
21use api::region::RegionResponse;
22use async_trait::async_trait;
23use common_catalog::format_full_table_name;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context, LockKey, Procedure, Status};
26use common_telemetry::{error, info, warn};
27use futures_util::future;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
32use strum::AsRefStr;
33use table::metadata::TableId;
34
35use crate::ddl::utils::{
36    add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
37};
38use crate::ddl::DdlContext;
39use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
40use crate::instruction::CacheIdent;
41use crate::key::table_info::TableInfoValue;
42use crate::key::table_route::PhysicalTableRouteValue;
43use crate::key::DeserializedValueWithBytes;
44use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
45use crate::metrics;
46use crate::rpc::ddl::AlterTableTask;
47use crate::rpc::router::{find_leaders, RegionRoute};
48
49pub struct AlterLogicalTablesProcedure {
50    pub context: DdlContext,
51    pub data: AlterTablesData,
52}
53
54impl AlterLogicalTablesProcedure {
55    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";
56
57    pub fn new(
58        tasks: Vec<AlterTableTask>,
59        physical_table_id: TableId,
60        context: DdlContext,
61    ) -> Self {
62        Self {
63            context,
64            data: AlterTablesData {
65                state: AlterTablesState::Prepare,
66                tasks,
67                table_info_values: vec![],
68                physical_table_id,
69                physical_table_info: None,
70                physical_table_route: None,
71                physical_columns: vec![],
72                table_cache_keys_to_invalidate: vec![],
73            },
74        }
75    }
76
77    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
78        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
79        Ok(Self { context, data })
80    }
81
82    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
83        // Checks all the tasks
84        self.check_input_tasks()?;
85        // Fills the table info values
86        self.fill_table_info_values().await?;
87        // Checks the physical table, must after [fill_table_info_values]
88        self.check_physical_table().await?;
89        // Fills the physical table info
90        self.fill_physical_table_info().await?;
91        // Filter the finished tasks
92        let finished_tasks = self.check_finished_tasks()?;
93        let already_finished_count = finished_tasks
94            .iter()
95            .map(|x| if *x { 1 } else { 0 })
96            .sum::<usize>();
97        let apply_tasks_count = self.data.tasks.len();
98        if already_finished_count == apply_tasks_count {
99            info!("All the alter tasks are finished, will skip the procedure.");
100            // Re-invalidate the table cache
101            self.data.state = AlterTablesState::InvalidateTableCache;
102            return Ok(Status::executing(true));
103        } else if already_finished_count > 0 {
104            info!(
105                "There are {} alter tasks, {} of them were already finished.",
106                apply_tasks_count, already_finished_count
107            );
108        }
109        self.filter_task(&finished_tasks)?;
110
111        // Next state
112        self.data.state = AlterTablesState::SubmitAlterRegionRequests;
113        Ok(Status::executing(true))
114    }
115
116    pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
117        // Safety: we have checked the state in on_prepare
118        let physical_table_route = &self.data.physical_table_route.as_ref().unwrap();
119        let leaders = find_leaders(&physical_table_route.region_routes);
120        let mut alter_region_tasks = Vec::with_capacity(leaders.len());
121
122        for peer in leaders {
123            let requester = self.context.node_manager.datanode(&peer).await;
124            let request = self.make_request(&peer, &physical_table_route.region_routes)?;
125
126            alter_region_tasks.push(async move {
127                requester
128                    .handle(request)
129                    .await
130                    .map_err(add_peer_context_if_needed(peer))
131            });
132        }
133
134        let mut results = future::join_all(alter_region_tasks)
135            .await
136            .into_iter()
137            .collect::<Result<Vec<_>>>()?;
138
139        // Collects responses from datanodes.
140        let phy_raw_schemas = results
141            .iter_mut()
142            .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
143            .collect::<Vec<_>>();
144
145        if phy_raw_schemas.is_empty() {
146            self.submit_sync_region_requests(results, &physical_table_route.region_routes)
147                .await;
148            self.data.state = AlterTablesState::UpdateMetadata;
149            return Ok(Status::executing(true));
150        }
151
152        // Verify all the physical schemas are the same
153        // Safety: previous check ensures this vec is not empty
154        let first = phy_raw_schemas.first().unwrap();
155        ensure!(
156            phy_raw_schemas.iter().all(|x| x == first),
157            MetadataCorruptionSnafu {
158                err_msg: "The physical schemas from datanodes are not the same."
159            }
160        );
161
162        // Decodes the physical raw schemas
163        if let Some(phy_raw_schema) = first {
164            self.data.physical_columns =
165                ColumnMetadata::decode_list(phy_raw_schema).context(DecodeJsonSnafu)?;
166        } else {
167            warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
168        }
169
170        self.submit_sync_region_requests(results, &physical_table_route.region_routes)
171            .await;
172        self.data.state = AlterTablesState::UpdateMetadata;
173        Ok(Status::executing(true))
174    }
175
176    async fn submit_sync_region_requests(
177        &self,
178        results: Vec<RegionResponse>,
179        region_routes: &[RegionRoute],
180    ) {
181        let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
182        if let Err(err) = sync_follower_regions(
183            &self.context,
184            self.data.physical_table_id,
185            results,
186            region_routes,
187            table_info.meta.engine.as_str(),
188        )
189        .await
190        {
191            error!(err; "Failed to sync regions for table {}, table_id: {}",
192                        format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name),
193                        self.data.physical_table_id
194            );
195        }
196    }
197
198    pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
199        self.update_physical_table_metadata().await?;
200        self.update_logical_tables_metadata().await?;
201
202        self.data.build_cache_keys_to_invalidate();
203        self.data.clear_metadata_fields();
204
205        self.data.state = AlterTablesState::InvalidateTableCache;
206        Ok(Status::executing(true))
207    }
208
209    pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
210        let to_invalidate = &self.data.table_cache_keys_to_invalidate;
211
212        self.context
213            .cache_invalidator
214            .invalidate(&Default::default(), to_invalidate)
215            .await?;
216        Ok(Status::done())
217    }
218}
219
220#[async_trait]
221impl Procedure for AlterLogicalTablesProcedure {
222    fn type_name(&self) -> &str {
223        Self::TYPE_NAME
224    }
225
226    async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
227        let state = &self.data.state;
228
229        let step = state.as_ref();
230
231        let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
232            .with_label_values(&[step])
233            .start_timer();
234
235        match state {
236            AlterTablesState::Prepare => self.on_prepare().await,
237            AlterTablesState::SubmitAlterRegionRequests => {
238                self.on_submit_alter_region_requests().await
239            }
240            AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
241            AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
242        }
243        .map_err(map_to_procedure_error)
244    }
245
246    fn dump(&self) -> ProcedureResult<String> {
247        serde_json::to_string(&self.data).context(ToJsonSnafu)
248    }
249
250    fn lock_key(&self) -> LockKey {
251        // CatalogLock, SchemaLock,
252        // TableLock
253        // TableNameLock(s)
254        let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
255        let table_ref = self.data.tasks[0].table_ref();
256        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
257        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
258        lock_key.push(TableLock::Write(self.data.physical_table_id).into());
259        lock_key.extend(
260            self.data
261                .table_info_values
262                .iter()
263                .map(|table| TableLock::Write(table.table_info.ident.table_id).into()),
264        );
265
266        LockKey::new(lock_key)
267    }
268}
269
270#[derive(Debug, Serialize, Deserialize)]
271pub struct AlterTablesData {
272    state: AlterTablesState,
273    tasks: Vec<AlterTableTask>,
274    /// Table info values before the alter operation.
275    /// Corresponding one-to-one with the AlterTableTask in tasks.
276    table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
277    /// Physical table info
278    physical_table_id: TableId,
279    physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
280    physical_table_route: Option<PhysicalTableRouteValue>,
281    physical_columns: Vec<ColumnMetadata>,
282    table_cache_keys_to_invalidate: Vec<CacheIdent>,
283}
284
285impl AlterTablesData {
286    /// Clears all data fields except `state` and `table_cache_keys_to_invalidate` after metadata update.
287    /// This is done to avoid persisting unnecessary data after the update metadata step.
288    fn clear_metadata_fields(&mut self) {
289        self.tasks.clear();
290        self.table_info_values.clear();
291        self.physical_table_id = 0;
292        self.physical_table_info = None;
293        self.physical_table_route = None;
294        self.physical_columns.clear();
295    }
296}
297
298#[derive(Debug, Serialize, Deserialize, AsRefStr)]
299enum AlterTablesState {
300    /// Prepares to alter the table
301    Prepare,
302    SubmitAlterRegionRequests,
303    /// Updates table metadata.
304    UpdateMetadata,
305    /// Broadcasts the invalidating table cache instruction.
306    InvalidateTableCache,
307}