Skip to main content

common_meta/ddl/
truncate_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::helper::to_pb_time_ranges;
16use api::v1::region::{
17    RegionRequest, RegionRequestHeader, TruncateRequest as PbTruncateRegionRequest, region_request,
18    truncate_request,
19};
20use async_trait::async_trait;
21use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
22use common_procedure::{
23    Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
24};
25use common_telemetry::debug;
26use common_telemetry::tracing_context::TracingContext;
27use futures::future::join_all;
28use serde::{Deserialize, Serialize};
29use snafu::{ResultExt, ensure};
30use store_api::storage::RegionId;
31use strum::AsRefStr;
32use table::metadata::{TableId, TableInfo};
33use table::table_name::TableName;
34use table::table_reference::TableReference;
35
36use crate::ddl::DdlContext;
37use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
38use crate::error::{ConvertTimeRangesSnafu, Result, TableNotFoundSnafu};
39use crate::key::DeserializedValueWithBytes;
40use crate::key::table_info::TableInfoValue;
41use crate::key::table_name::TableNameKey;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::rpc::ddl::TruncateTableTask;
45use crate::rpc::router::{find_leader_regions, find_leaders};
46
47pub struct TruncateTableProcedure {
48    context: DdlContext,
49    data: TruncateTableData,
50}
51
52#[async_trait]
53impl Procedure for TruncateTableProcedure {
54    fn type_name(&self) -> &str {
55        Self::TYPE_NAME
56    }
57
58    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
59        let state = &self.data.state;
60
61        let _timer = metrics::METRIC_META_PROCEDURE_TRUNCATE_TABLE
62            .with_label_values(&[state.as_ref()])
63            .start_timer();
64
65        match self.data.state {
66            TruncateTableState::Prepare => self.on_prepare().await,
67            TruncateTableState::DatanodeTruncateRegions => {
68                self.on_datanode_truncate_regions().await
69            }
70        }
71        .map_err(map_to_procedure_error)
72    }
73
74    fn dump(&self) -> ProcedureResult<String> {
75        serde_json::to_string(&self.data).context(ToJsonSnafu)
76    }
77
78    fn lock_key(&self) -> LockKey {
79        let table_ref = &self.data.table_ref();
80        let table_id = self.data.table_id();
81        let lock_key = vec![
82            CatalogLock::Read(table_ref.catalog).into(),
83            SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
84            TableLock::Write(table_id).into(),
85        ];
86
87        LockKey::new(lock_key)
88    }
89}
90
91impl TruncateTableProcedure {
92    pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::TruncateTable";
93
94    pub(crate) fn new(
95        task: TruncateTableTask,
96        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
97        context: DdlContext,
98    ) -> Self {
99        Self {
100            context,
101            data: TruncateTableData::new(task, table_info_value),
102        }
103    }
104
105    pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
106        let data = serde_json::from_str(json).context(FromJsonSnafu)?;
107        Ok(Self { context, data })
108    }
109
110    // Checks whether the table exists.
111    async fn on_prepare(&mut self) -> Result<Status> {
112        let table_ref = &self.data.table_ref();
113
114        let manager = &self.context.table_metadata_manager;
115
116        let exist = manager
117            .table_name_manager()
118            .exists(TableNameKey::new(
119                table_ref.catalog,
120                table_ref.schema,
121                table_ref.table,
122            ))
123            .await?;
124
125        ensure!(
126            exist,
127            TableNotFoundSnafu {
128                table_name: table_ref.to_string()
129            }
130        );
131
132        self.data.state = TruncateTableState::DatanodeTruncateRegions;
133
134        Ok(Status::executing(true))
135    }
136
137    async fn on_datanode_truncate_regions(&mut self) -> Result<Status> {
138        let table_id = self.data.table_id();
139
140        let (_, physical_table_route) = self
141            .context
142            .table_metadata_manager
143            .table_route_manager()
144            .get_physical_table_route(table_id)
145            .await?;
146        let leaders = find_leaders(&physical_table_route.region_routes);
147        let mut truncate_region_tasks = Vec::with_capacity(leaders.len());
148
149        for datanode in leaders {
150            let requester = self.context.node_manager.datanode(&datanode).await;
151            let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);
152
153            for region in regions {
154                let region_id = RegionId::new(table_id, region);
155                debug!(
156                    "Truncating table {} region {} on Datanode {:?}",
157                    self.data.table_ref(),
158                    region_id,
159                    datanode
160                );
161
162                let time_ranges = &self.data.task.time_ranges;
163                let kind = if time_ranges.is_empty() {
164                    truncate_request::Kind::All(api::v1::region::All {})
165                } else {
166                    let pb_time_ranges =
167                        to_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
168                    truncate_request::Kind::TimeRanges(pb_time_ranges)
169                };
170
171                let request = RegionRequest {
172                    header: Some(RegionRequestHeader {
173                        tracing_context: TracingContext::from_current_span().to_w3c(),
174                        ..Default::default()
175                    }),
176                    body: Some(region_request::Body::Truncate(PbTruncateRegionRequest {
177                        region_id: region_id.as_u64(),
178                        kind: Some(kind),
179                    })),
180                };
181
182                let datanode = datanode.clone();
183                let requester = requester.clone();
184
185                truncate_region_tasks.push(async move {
186                    requester
187                        .handle(request)
188                        .await
189                        .map_err(add_peer_context_if_needed(datanode))
190                });
191            }
192        }
193
194        join_all(truncate_region_tasks)
195            .await
196            .into_iter()
197            .collect::<Result<Vec<_>>>()?;
198
199        Ok(Status::done())
200    }
201}
202
203#[derive(Debug, Serialize, Deserialize)]
204pub struct TruncateTableData {
205    state: TruncateTableState,
206    task: TruncateTableTask,
207    table_info_value: DeserializedValueWithBytes<TableInfoValue>,
208}
209
210impl TruncateTableData {
211    pub fn new(
212        task: TruncateTableTask,
213        table_info_value: DeserializedValueWithBytes<TableInfoValue>,
214    ) -> Self {
215        Self {
216            state: TruncateTableState::Prepare,
217            task,
218            table_info_value,
219        }
220    }
221
222    pub fn table_ref(&self) -> TableReference<'_> {
223        self.task.table_ref()
224    }
225
226    pub fn table_name(&self) -> TableName {
227        self.task.table_name()
228    }
229
230    fn table_info(&self) -> &TableInfo {
231        &self.table_info_value.table_info
232    }
233
234    fn table_id(&self) -> TableId {
235        self.table_info().ident.table_id
236    }
237}
238
239#[derive(Debug, Serialize, Deserialize, AsRefStr)]
240enum TruncateTableState {
241    /// Prepares to truncate the table
242    Prepare,
243    /// Truncates regions on Datanode
244    DatanodeTruncateRegions,
245}