common_meta/ddl/
truncate_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::region::{
16    region_request, RegionRequest, RegionRequestHeader, TruncateRequest as PbTruncateRegionRequest,
17};
18use async_trait::async_trait;
19use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
20use common_procedure::{
21    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
22};
23use common_telemetry::debug;
24use common_telemetry::tracing_context::TracingContext;
25use futures::future::join_all;
26use serde::{Deserialize, Serialize};
27use snafu::{ensure, ResultExt};
28use store_api::storage::RegionId;
29use strum::AsRefStr;
30use table::metadata::{RawTableInfo, TableId};
31use table::table_name::TableName;
32use table::table_reference::TableReference;
33
34use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
35use crate::ddl::DdlContext;
36use crate::error::{Result, TableNotFoundSnafu};
37use crate::key::table_info::TableInfoValue;
38use crate::key::table_name::TableNameKey;
39use crate::key::DeserializedValueWithBytes;
40use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
41use crate::metrics;
42use crate::rpc::ddl::TruncateTableTask;
43use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
44
45pub struct TruncateTableProcedure {
46    context: DdlContext,
47    data: TruncateTableData,
48}
49
50#[async_trait]
51impl Procedure for TruncateTableProcedure {
52    fn type_name(&self) -> &str {
53        Self::TYPE_NAME
54    }
55
56    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
57        let state = &self.data.state;
58
59        let _timer = metrics::METRIC_META_PROCEDURE_TRUNCATE_TABLE
60            .with_label_values(&[state.as_ref()])
61            .start_timer();
62
63        match self.data.state {
64            TruncateTableState::Prepare => self.on_prepare().await,
65            TruncateTableState::DatanodeTruncateRegions => {
66                self.on_datanode_truncate_regions().await
67            }
68        }
69        .map_err(handle_retry_error)
70    }
71
72    fn dump(&self) -> ProcedureResult<String> {
73        serde_json::to_string(&self.data).context(ToJsonSnafu)
74    }
75
76    fn lock_key(&self) -> LockKey {
77        let table_ref = &self.data.table_ref();
78        let table_id = self.data.table_id();
79        let lock_key = vec![
80            CatalogLock::Read(table_ref.catalog).into(),
81            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
82            TableLock::Write(table_id).into(),
83        ];
84
85        LockKey::new(lock_key)
86    }
87}
88
89impl TruncateTableProcedure {
90    pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::TruncateTable";
91
92    pub(crate) fn new(
93        task: TruncateTableTask,
94        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
95        region_routes: Vec<RegionRoute>,
96        context: DdlContext,
97    ) -> Self {
98        Self {
99            context,
100            data: TruncateTableData::new(task, table_info_value, region_routes),
101        }
102    }
103
104    pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
105        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
106        Ok(Self { context, data })
107    }
108
109    // Checks whether the table exists.
110    async fn on_prepare(&mut self) -> Result<Status> {
111        let table_ref = &self.data.table_ref();
112
113        let manager = &self.context.table_metadata_manager;
114
115        let exist = manager
116            .table_name_manager()
117            .exists(TableNameKey::new(
118                table_ref.catalog,
119                table_ref.schema,
120                table_ref.table,
121            ))
122            .await?;
123
124        ensure!(
125            exist,
126            TableNotFoundSnafu {
127                table_name: table_ref.to_string()
128            }
129        );
130
131        self.data.state = TruncateTableState::DatanodeTruncateRegions;
132
133        Ok(Status::executing(true))
134    }
135
136    async fn on_datanode_truncate_regions(&mut self) -> Result<Status> {
137        let table_id = self.data.table_id();
138
139        let region_routes = &self.data.region_routes;
140        let leaders = find_leaders(region_routes);
141        let mut truncate_region_tasks = Vec::with_capacity(leaders.len());
142
143        for datanode in leaders {
144            let requester = self.context.node_manager.datanode(&datanode).await;
145            let regions = find_leader_regions(region_routes, &datanode);
146
147            for region in regions {
148                let region_id = RegionId::new(table_id, region);
149                debug!(
150                    "Truncating table {} region {} on Datanode {:?}",
151                    self.data.table_ref(),
152                    region_id,
153                    datanode
154                );
155
156                let request = RegionRequest {
157                    header: Some(RegionRequestHeader {
158                        tracing_context: TracingContext::from_current_span().to_w3c(),
159                        ..Default::default()
160                    }),
161                    body: Some(region_request::Body::Truncate(PbTruncateRegionRequest {
162                        region_id: region_id.as_u64(),
163                    })),
164                };
165
166                let datanode = datanode.clone();
167                let requester = requester.clone();
168
169                truncate_region_tasks.push(async move {
170                    requester
171                        .handle(request)
172                        .await
173                        .map_err(add_peer_context_if_needed(datanode))
174                });
175            }
176        }
177
178        join_all(truncate_region_tasks)
179            .await
180            .into_iter()
181            .collect::<Result<Vec<_>>>()?;
182
183        Ok(Status::done())
184    }
185}
186
187#[derive(Debug, Serialize, Deserialize)]
188pub struct TruncateTableData {
189    state: TruncateTableState,
190    task: TruncateTableTask,
191    table_info_value: DeserializedValueWithBytes<TableInfoValue>,
192    region_routes: Vec<RegionRoute>,
193}
194
195impl TruncateTableData {
196    pub fn new(
197        task: TruncateTableTask,
198        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
199        region_routes: Vec<RegionRoute>,
200    ) -> Self {
201        Self {
202            state: TruncateTableState::Prepare,
203            task,
204            table_info_value,
205            region_routes,
206        }
207    }
208
209    pub fn table_ref(&self) -> TableReference {
210        self.task.table_ref()
211    }
212
213    pub fn table_name(&self) -> TableName {
214        self.task.table_name()
215    }
216
217    fn table_info(&self) -> &RawTableInfo {
218        &self.table_info_value.table_info
219    }
220
221    fn table_id(&self) -> TableId {
222        self.table_info().ident.table_id
223    }
224}
225
226#[derive(Debug, Serialize, Deserialize, AsRefStr)]
227enum TruncateTableState {
228    /// Prepares to truncate the table
229    Prepare,
230    /// Truncates regions on Datanode
231    DatanodeTruncateRegions,
232}