common_meta/ddl/
alter_logical_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod check;
16mod metadata;
17mod region_request;
18mod table_cache_keys;
19mod update_metadata;
20
21use api::region::RegionResponse;
22use async_trait::async_trait;
23use common_catalog::format_full_table_name;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context, LockKey, Procedure, Status};
26use common_telemetry::{error, info, warn};
27use futures_util::future;
28pub use region_request::make_alter_region_request;
29use serde::{Deserialize, Serialize};
30use snafu::{ensure, ResultExt};
31use store_api::metadata::ColumnMetadata;
32use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
33use strum::AsRefStr;
34use table::metadata::TableId;
35
36use crate::ddl::utils::{
37    add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
38};
39use crate::ddl::DdlContext;
40use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
41use crate::instruction::CacheIdent;
42use crate::key::table_info::TableInfoValue;
43use crate::key::table_route::PhysicalTableRouteValue;
44use crate::key::DeserializedValueWithBytes;
45use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
46use crate::metrics;
47use crate::rpc::ddl::AlterTableTask;
48use crate::rpc::router::{find_leaders, RegionRoute};
49
50pub struct AlterLogicalTablesProcedure {
51    pub context: DdlContext,
52    pub data: AlterTablesData,
53}
54
55impl AlterLogicalTablesProcedure {
56    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";
57
58    pub fn new(
59        tasks: Vec<AlterTableTask>,
60        physical_table_id: TableId,
61        context: DdlContext,
62    ) -> Self {
63        Self {
64            context,
65            data: AlterTablesData {
66                state: AlterTablesState::Prepare,
67                tasks,
68                table_info_values: vec![],
69                physical_table_id,
70                physical_table_info: None,
71                physical_table_route: None,
72                physical_columns: vec![],
73                table_cache_keys_to_invalidate: vec![],
74            },
75        }
76    }
77
78    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
79        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
80        Ok(Self { context, data })
81    }
82
83    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
84        // Checks all the tasks
85        self.check_input_tasks()?;
86        // Fills the table info values
87        self.fill_table_info_values().await?;
88        // Checks the physical table, must after [fill_table_info_values]
89        self.check_physical_table().await?;
90        // Fills the physical table info
91        self.fill_physical_table_info().await?;
92        // Filter the finished tasks
93        let finished_tasks = self.check_finished_tasks()?;
94        let already_finished_count = finished_tasks
95            .iter()
96            .map(|x| if *x { 1 } else { 0 })
97            .sum::<usize>();
98        let apply_tasks_count = self.data.tasks.len();
99        if already_finished_count == apply_tasks_count {
100            info!("All the alter tasks are finished, will skip the procedure.");
101            // Re-invalidate the table cache
102            self.data.state = AlterTablesState::InvalidateTableCache;
103            return Ok(Status::executing(true));
104        } else if already_finished_count > 0 {
105            info!(
106                "There are {} alter tasks, {} of them were already finished.",
107                apply_tasks_count, already_finished_count
108            );
109        }
110        self.filter_task(&finished_tasks)?;
111
112        // Next state
113        self.data.state = AlterTablesState::SubmitAlterRegionRequests;
114        Ok(Status::executing(true))
115    }
116
117    pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
118        // Safety: we have checked the state in on_prepare
119        let physical_table_route = &self.data.physical_table_route.as_ref().unwrap();
120        let leaders = find_leaders(&physical_table_route.region_routes);
121        let mut alter_region_tasks = Vec::with_capacity(leaders.len());
122
123        for peer in leaders {
124            let requester = self.context.node_manager.datanode(&peer).await;
125            let request = self.make_request(&peer, &physical_table_route.region_routes)?;
126
127            alter_region_tasks.push(async move {
128                requester
129                    .handle(request)
130                    .await
131                    .map_err(add_peer_context_if_needed(peer))
132            });
133        }
134
135        let mut results = future::join_all(alter_region_tasks)
136            .await
137            .into_iter()
138            .collect::<Result<Vec<_>>>()?;
139
140        // Collects responses from datanodes.
141        let phy_raw_schemas = results
142            .iter_mut()
143            .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
144            .collect::<Vec<_>>();
145
146        if phy_raw_schemas.is_empty() {
147            self.submit_sync_region_requests(results, &physical_table_route.region_routes)
148                .await;
149            self.data.state = AlterTablesState::UpdateMetadata;
150            return Ok(Status::executing(true));
151        }
152
153        // Verify all the physical schemas are the same
154        // Safety: previous check ensures this vec is not empty
155        let first = phy_raw_schemas.first().unwrap();
156        ensure!(
157            phy_raw_schemas.iter().all(|x| x == first),
158            MetadataCorruptionSnafu {
159                err_msg: "The physical schemas from datanodes are not the same."
160            }
161        );
162
163        // Decodes the physical raw schemas
164        if let Some(phy_raw_schema) = first {
165            self.data.physical_columns =
166                ColumnMetadata::decode_list(phy_raw_schema).context(DecodeJsonSnafu)?;
167        } else {
168            warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
169        }
170
171        self.submit_sync_region_requests(results, &physical_table_route.region_routes)
172            .await;
173        self.data.state = AlterTablesState::UpdateMetadata;
174        Ok(Status::executing(true))
175    }
176
177    async fn submit_sync_region_requests(
178        &self,
179        results: Vec<RegionResponse>,
180        region_routes: &[RegionRoute],
181    ) {
182        let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
183        if let Err(err) = sync_follower_regions(
184            &self.context,
185            self.data.physical_table_id,
186            results,
187            region_routes,
188            table_info.meta.engine.as_str(),
189        )
190        .await
191        {
192            error!(err; "Failed to sync regions for table {}, table_id: {}",
193                        format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name),
194                        self.data.physical_table_id
195            );
196        }
197    }
198
199    pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
200        self.update_physical_table_metadata().await?;
201        self.update_logical_tables_metadata().await?;
202
203        self.data.build_cache_keys_to_invalidate();
204        self.data.clear_metadata_fields();
205
206        self.data.state = AlterTablesState::InvalidateTableCache;
207        Ok(Status::executing(true))
208    }
209
210    pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
211        let to_invalidate = &self.data.table_cache_keys_to_invalidate;
212
213        self.context
214            .cache_invalidator
215            .invalidate(&Default::default(), to_invalidate)
216            .await?;
217        Ok(Status::done())
218    }
219}
220
221#[async_trait]
222impl Procedure for AlterLogicalTablesProcedure {
223    fn type_name(&self) -> &str {
224        Self::TYPE_NAME
225    }
226
227    async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
228        let state = &self.data.state;
229
230        let step = state.as_ref();
231
232        let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
233            .with_label_values(&[step])
234            .start_timer();
235
236        match state {
237            AlterTablesState::Prepare => self.on_prepare().await,
238            AlterTablesState::SubmitAlterRegionRequests => {
239                self.on_submit_alter_region_requests().await
240            }
241            AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
242            AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
243        }
244        .map_err(map_to_procedure_error)
245    }
246
247    fn dump(&self) -> ProcedureResult<String> {
248        serde_json::to_string(&self.data).context(ToJsonSnafu)
249    }
250
251    fn lock_key(&self) -> LockKey {
252        // CatalogLock, SchemaLock,
253        // TableLock
254        // TableNameLock(s)
255        let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
256        let table_ref = self.data.tasks[0].table_ref();
257        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
258        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
259        lock_key.push(TableLock::Write(self.data.physical_table_id).into());
260        lock_key.extend(
261            self.data
262                .table_info_values
263                .iter()
264                .map(|table| TableLock::Write(table.table_info.ident.table_id).into()),
265        );
266
267        LockKey::new(lock_key)
268    }
269}
270
271#[derive(Debug, Serialize, Deserialize)]
272pub struct AlterTablesData {
273    state: AlterTablesState,
274    tasks: Vec<AlterTableTask>,
275    /// Table info values before the alter operation.
276    /// Corresponding one-to-one with the AlterTableTask in tasks.
277    table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
278    /// Physical table info
279    physical_table_id: TableId,
280    physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
281    physical_table_route: Option<PhysicalTableRouteValue>,
282    physical_columns: Vec<ColumnMetadata>,
283    table_cache_keys_to_invalidate: Vec<CacheIdent>,
284}
285
286impl AlterTablesData {
287    /// Clears all data fields except `state` and `table_cache_keys_to_invalidate` after metadata update.
288    /// This is done to avoid persisting unnecessary data after the update metadata step.
289    fn clear_metadata_fields(&mut self) {
290        self.tasks.clear();
291        self.table_info_values.clear();
292        self.physical_table_id = 0;
293        self.physical_table_info = None;
294        self.physical_table_route = None;
295        self.physical_columns.clear();
296    }
297}
298
299#[derive(Debug, Serialize, Deserialize, AsRefStr)]
300enum AlterTablesState {
301    /// Prepares to alter the table
302    Prepare,
303    SubmitAlterRegionRequests,
304    /// Updates table metadata.
305    UpdateMetadata,
306    /// Broadcasts the invalidating table cache instruction.
307    InvalidateTableCache,
308}