common_meta/ddl/
alter_logical_tables.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod check;
16mod metadata;
17mod region_request;
18mod table_cache_keys;
19mod update_metadata;
20
21use api::region::RegionResponse;
22use async_trait::async_trait;
23use common_catalog::format_full_table_name;
24use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
25use common_procedure::{Context, LockKey, Procedure, Status};
26use common_telemetry::{error, info, warn};
27use futures_util::future;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
32use strum::AsRefStr;
33use table::metadata::TableId;
34
35use crate::ddl::utils::{add_peer_context_if_needed, sync_follower_regions};
36use crate::ddl::DdlContext;
37use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result};
38use crate::key::table_info::TableInfoValue;
39use crate::key::table_route::PhysicalTableRouteValue;
40use crate::key::DeserializedValueWithBytes;
41use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
42use crate::metrics;
43use crate::rpc::ddl::AlterTableTask;
44use crate::rpc::router::{find_leaders, RegionRoute};
45
46pub struct AlterLogicalTablesProcedure {
47    pub context: DdlContext,
48    pub data: AlterTablesData,
49}
50
51impl AlterLogicalTablesProcedure {
52    pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterLogicalTables";
53
54    pub fn new(
55        tasks: Vec<AlterTableTask>,
56        physical_table_id: TableId,
57        context: DdlContext,
58    ) -> Self {
59        Self {
60            context,
61            data: AlterTablesData {
62                state: AlterTablesState::Prepare,
63                tasks,
64                table_info_values: vec![],
65                physical_table_id,
66                physical_table_info: None,
67                physical_table_route: None,
68                physical_columns: vec![],
69            },
70        }
71    }
72
73    pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
74        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
75        Ok(Self { context, data })
76    }
77
78    pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
79        // Checks all the tasks
80        self.check_input_tasks()?;
81        // Fills the table info values
82        self.fill_table_info_values().await?;
83        // Checks the physical table, must after [fill_table_info_values]
84        self.check_physical_table().await?;
85        // Fills the physical table info
86        self.fill_physical_table_info().await?;
87        // Filter the finished tasks
88        let finished_tasks = self.check_finished_tasks()?;
89        let already_finished_count = finished_tasks
90            .iter()
91            .map(|x| if *x { 1 } else { 0 })
92            .sum::<usize>();
93        let apply_tasks_count = self.data.tasks.len();
94        if already_finished_count == apply_tasks_count {
95            info!("All the alter tasks are finished, will skip the procedure.");
96            // Re-invalidate the table cache
97            self.data.state = AlterTablesState::InvalidateTableCache;
98            return Ok(Status::executing(true));
99        } else if already_finished_count > 0 {
100            info!(
101                "There are {} alter tasks, {} of them were already finished.",
102                apply_tasks_count, already_finished_count
103            );
104        }
105        self.filter_task(&finished_tasks)?;
106
107        // Next state
108        self.data.state = AlterTablesState::SubmitAlterRegionRequests;
109        Ok(Status::executing(true))
110    }
111
112    pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
113        // Safety: we have checked the state in on_prepare
114        let physical_table_route = &self.data.physical_table_route.as_ref().unwrap();
115        let leaders = find_leaders(&physical_table_route.region_routes);
116        let mut alter_region_tasks = Vec::with_capacity(leaders.len());
117
118        for peer in leaders {
119            let requester = self.context.node_manager.datanode(&peer).await;
120            let request = self.make_request(&peer, &physical_table_route.region_routes)?;
121
122            alter_region_tasks.push(async move {
123                requester
124                    .handle(request)
125                    .await
126                    .map_err(add_peer_context_if_needed(peer))
127            });
128        }
129
130        let mut results = future::join_all(alter_region_tasks)
131            .await
132            .into_iter()
133            .collect::<Result<Vec<_>>>()?;
134
135        // Collects responses from datanodes.
136        let phy_raw_schemas = results
137            .iter_mut()
138            .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY))
139            .collect::<Vec<_>>();
140
141        if phy_raw_schemas.is_empty() {
142            self.submit_sync_region_requests(results, &physical_table_route.region_routes)
143                .await;
144            self.data.state = AlterTablesState::UpdateMetadata;
145            return Ok(Status::executing(true));
146        }
147
148        // Verify all the physical schemas are the same
149        // Safety: previous check ensures this vec is not empty
150        let first = phy_raw_schemas.first().unwrap();
151        ensure!(
152            phy_raw_schemas.iter().all(|x| x == first),
153            MetadataCorruptionSnafu {
154                err_msg: "The physical schemas from datanodes are not the same."
155            }
156        );
157
158        // Decodes the physical raw schemas
159        if let Some(phy_raw_schema) = first {
160            self.data.physical_columns =
161                ColumnMetadata::decode_list(phy_raw_schema).context(DecodeJsonSnafu)?;
162        } else {
163            warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
164        }
165
166        self.submit_sync_region_requests(results, &physical_table_route.region_routes)
167            .await;
168        self.data.state = AlterTablesState::UpdateMetadata;
169        Ok(Status::executing(true))
170    }
171
172    async fn submit_sync_region_requests(
173        &self,
174        results: Vec<RegionResponse>,
175        region_routes: &[RegionRoute],
176    ) {
177        let table_info = &self.data.physical_table_info.as_ref().unwrap().table_info;
178        if let Err(err) = sync_follower_regions(
179            &self.context,
180            self.data.physical_table_id,
181            results,
182            region_routes,
183            table_info.meta.engine.as_str(),
184        )
185        .await
186        {
187            error!(err; "Failed to sync regions for table {}, table_id: {}",
188                        format_full_table_name(&table_info.catalog_name, &table_info.schema_name, &table_info.name),
189                        self.data.physical_table_id
190            );
191        }
192    }
193
194    pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
195        self.update_physical_table_metadata().await?;
196        self.update_logical_tables_metadata().await?;
197
198        self.data.state = AlterTablesState::InvalidateTableCache;
199        Ok(Status::executing(true))
200    }
201
202    pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
203        let to_invalidate = self.build_table_cache_keys_to_invalidate();
204
205        self.context
206            .cache_invalidator
207            .invalidate(&Default::default(), &to_invalidate)
208            .await?;
209        Ok(Status::done())
210    }
211}
212
213#[async_trait]
214impl Procedure for AlterLogicalTablesProcedure {
215    fn type_name(&self) -> &str {
216        Self::TYPE_NAME
217    }
218
219    async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
220        let error_handler = |e: Error| {
221            if e.is_retry_later() {
222                common_procedure::Error::retry_later(e)
223            } else {
224                common_procedure::Error::external(e)
225            }
226        };
227
228        let state = &self.data.state;
229
230        let step = state.as_ref();
231
232        let _timer = metrics::METRIC_META_PROCEDURE_ALTER_TABLE
233            .with_label_values(&[step])
234            .start_timer();
235
236        match state {
237            AlterTablesState::Prepare => self.on_prepare().await,
238            AlterTablesState::SubmitAlterRegionRequests => {
239                self.on_submit_alter_region_requests().await
240            }
241            AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
242            AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
243        }
244        .map_err(error_handler)
245    }
246
247    fn dump(&self) -> ProcedureResult<String> {
248        serde_json::to_string(&self.data).context(ToJsonSnafu)
249    }
250
251    fn lock_key(&self) -> LockKey {
252        // CatalogLock, SchemaLock,
253        // TableLock
254        // TableNameLock(s)
255        let mut lock_key = Vec::with_capacity(2 + 1 + self.data.tasks.len());
256        let table_ref = self.data.tasks[0].table_ref();
257        lock_key.push(CatalogLock::Read(table_ref.catalog).into());
258        lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
259        lock_key.push(TableLock::Write(self.data.physical_table_id).into());
260        lock_key.extend(
261            self.data
262                .table_info_values
263                .iter()
264                .map(|table| TableLock::Write(table.table_info.ident.table_id).into()),
265        );
266
267        LockKey::new(lock_key)
268    }
269}
270
271#[derive(Debug, Serialize, Deserialize)]
272pub struct AlterTablesData {
273    state: AlterTablesState,
274    tasks: Vec<AlterTableTask>,
275    /// Table info values before the alter operation.
276    /// Corresponding one-to-one with the AlterTableTask in tasks.
277    table_info_values: Vec<DeserializedValueWithBytes<TableInfoValue>>,
278    /// Physical table info
279    physical_table_id: TableId,
280    physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
281    physical_table_route: Option<PhysicalTableRouteValue>,
282    physical_columns: Vec<ColumnMetadata>,
283}
284
285#[derive(Debug, Serialize, Deserialize, AsRefStr)]
286enum AlterTablesState {
287    /// Prepares to alter the table
288    Prepare,
289    SubmitAlterRegionRequests,
290    /// Updates table metadata.
291    UpdateMetadata,
292    /// Broadcasts the invalidating table cache instruction.
293    InvalidateTableCache,
294}