common_meta/ddl/
truncate_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::helper::to_pb_time_ranges;
16use api::v1::region::{
17    region_request, truncate_request, RegionRequest, RegionRequestHeader,
18    TruncateRequest as PbTruncateRegionRequest,
19};
20use async_trait::async_trait;
21use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
22use common_procedure::{
23    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
24};
25use common_telemetry::debug;
26use common_telemetry::tracing_context::TracingContext;
27use futures::future::join_all;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, ResultExt};
30use store_api::storage::RegionId;
31use strum::AsRefStr;
32use table::metadata::{RawTableInfo, TableId};
33use table::table_name::TableName;
34use table::table_reference::TableReference;
35
36use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
37use crate::ddl::DdlContext;
38use crate::error::{ConvertTimeRangesSnafu, Result, TableNotFoundSnafu};
39use crate::key::table_info::TableInfoValue;
40use crate::key::table_name::TableNameKey;
41use crate::key::DeserializedValueWithBytes;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::rpc::ddl::TruncateTableTask;
45use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
46
47pub struct TruncateTableProcedure {
48    context: DdlContext,
49    data: TruncateTableData,
50}
51
52#[async_trait]
53impl Procedure for TruncateTableProcedure {
54    fn type_name(&self) -> &str {
55        Self::TYPE_NAME
56    }
57
58    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
59        let state = &self.data.state;
60
61        let _timer = metrics::METRIC_META_PROCEDURE_TRUNCATE_TABLE
62            .with_label_values(&[state.as_ref()])
63            .start_timer();
64
65        match self.data.state {
66            TruncateTableState::Prepare => self.on_prepare().await,
67            TruncateTableState::DatanodeTruncateRegions => {
68                self.on_datanode_truncate_regions().await
69            }
70        }
71        .map_err(map_to_procedure_error)
72    }
73
74    fn dump(&self) -> ProcedureResult<String> {
75        serde_json::to_string(&self.data).context(ToJsonSnafu)
76    }
77
78    fn lock_key(&self) -> LockKey {
79        let table_ref = &self.data.table_ref();
80        let table_id = self.data.table_id();
81        let lock_key = vec![
82            CatalogLock::Read(table_ref.catalog).into(),
83            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
84            TableLock::Write(table_id).into(),
85        ];
86
87        LockKey::new(lock_key)
88    }
89}
90
91impl TruncateTableProcedure {
92    pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::TruncateTable";
93
94    pub(crate) fn new(
95        task: TruncateTableTask,
96        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
97        region_routes: Vec<RegionRoute>,
98        context: DdlContext,
99    ) -> Self {
100        Self {
101            context,
102            data: TruncateTableData::new(task, table_info_value, region_routes),
103        }
104    }
105
106    pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
107        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
108        Ok(Self { context, data })
109    }
110
111    // Checks whether the table exists.
112    async fn on_prepare(&mut self) -> Result<Status> {
113        let table_ref = &self.data.table_ref();
114
115        let manager = &self.context.table_metadata_manager;
116
117        let exist = manager
118            .table_name_manager()
119            .exists(TableNameKey::new(
120                table_ref.catalog,
121                table_ref.schema,
122                table_ref.table,
123            ))
124            .await?;
125
126        ensure!(
127            exist,
128            TableNotFoundSnafu {
129                table_name: table_ref.to_string()
130            }
131        );
132
133        self.data.state = TruncateTableState::DatanodeTruncateRegions;
134
135        Ok(Status::executing(true))
136    }
137
138    async fn on_datanode_truncate_regions(&mut self) -> Result<Status> {
139        let table_id = self.data.table_id();
140
141        let region_routes = &self.data.region_routes;
142        let leaders = find_leaders(region_routes);
143        let mut truncate_region_tasks = Vec::with_capacity(leaders.len());
144
145        for datanode in leaders {
146            let requester = self.context.node_manager.datanode(&datanode).await;
147            let regions = find_leader_regions(region_routes, &datanode);
148
149            for region in regions {
150                let region_id = RegionId::new(table_id, region);
151                debug!(
152                    "Truncating table {} region {} on Datanode {:?}",
153                    self.data.table_ref(),
154                    region_id,
155                    datanode
156                );
157
158                let time_ranges = &self.data.task.time_ranges;
159                let kind = if time_ranges.is_empty() {
160                    truncate_request::Kind::All(api::v1::region::All {})
161                } else {
162                    let pb_time_ranges =
163                        to_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
164                    truncate_request::Kind::TimeRanges(pb_time_ranges)
165                };
166
167                let request = RegionRequest {
168                    header: Some(RegionRequestHeader {
169                        tracing_context: TracingContext::from_current_span().to_w3c(),
170                        ..Default::default()
171                    }),
172                    body: Some(region_request::Body::Truncate(PbTruncateRegionRequest {
173                        region_id: region_id.as_u64(),
174                        kind: Some(kind),
175                    })),
176                };
177
178                let datanode = datanode.clone();
179                let requester = requester.clone();
180
181                truncate_region_tasks.push(async move {
182                    requester
183                        .handle(request)
184                        .await
185                        .map_err(add_peer_context_if_needed(datanode))
186                });
187            }
188        }
189
190        join_all(truncate_region_tasks)
191            .await
192            .into_iter()
193            .collect::<Result<Vec<_>>>()?;
194
195        Ok(Status::done())
196    }
197}
198
199#[derive(Debug, Serialize, Deserialize)]
200pub struct TruncateTableData {
201    state: TruncateTableState,
202    task: TruncateTableTask,
203    table_info_value: DeserializedValueWithBytes<TableInfoValue>,
204    region_routes: Vec<RegionRoute>,
205}
206
207impl TruncateTableData {
208    pub fn new(
209        task: TruncateTableTask,
210        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
211        region_routes: Vec<RegionRoute>,
212    ) -> Self {
213        Self {
214            state: TruncateTableState::Prepare,
215            task,
216            table_info_value,
217            region_routes,
218        }
219    }
220
221    pub fn table_ref(&self) -> TableReference {
222        self.task.table_ref()
223    }
224
225    pub fn table_name(&self) -> TableName {
226        self.task.table_name()
227    }
228
229    fn table_info(&self) -> &RawTableInfo {
230        &self.table_info_value.table_info
231    }
232
233    fn table_id(&self) -> TableId {
234        self.table_info().ident.table_id
235    }
236}
237
238#[derive(Debug, Serialize, Deserialize, AsRefStr)]
239enum TruncateTableState {
240    /// Prepares to truncate the table
241    Prepare,
242    /// Truncates regions on Datanode
243    DatanodeTruncateRegions,
244}