common_meta/ddl/
truncate_table.rs1use api::helper::to_pb_time_ranges;
16use api::v1::region::{
17 region_request, truncate_request, RegionRequest, RegionRequestHeader,
18 TruncateRequest as PbTruncateRegionRequest,
19};
20use async_trait::async_trait;
21use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
22use common_procedure::{
23 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
24};
25use common_telemetry::debug;
26use common_telemetry::tracing_context::TracingContext;
27use futures::future::join_all;
28use serde::{Deserialize, Serialize};
29use snafu::{ensure, ResultExt};
30use store_api::storage::RegionId;
31use strum::AsRefStr;
32use table::metadata::{RawTableInfo, TableId};
33use table::table_name::TableName;
34use table::table_reference::TableReference;
35
36use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
37use crate::ddl::DdlContext;
38use crate::error::{ConvertTimeRangesSnafu, Result, TableNotFoundSnafu};
39use crate::key::table_info::TableInfoValue;
40use crate::key::table_name::TableNameKey;
41use crate::key::DeserializedValueWithBytes;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::rpc::ddl::TruncateTableTask;
45use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
46
47pub struct TruncateTableProcedure {
48 context: DdlContext,
49 data: TruncateTableData,
50}
51
52#[async_trait]
53impl Procedure for TruncateTableProcedure {
54 fn type_name(&self) -> &str {
55 Self::TYPE_NAME
56 }
57
58 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
59 let state = &self.data.state;
60
61 let _timer = metrics::METRIC_META_PROCEDURE_TRUNCATE_TABLE
62 .with_label_values(&[state.as_ref()])
63 .start_timer();
64
65 match self.data.state {
66 TruncateTableState::Prepare => self.on_prepare().await,
67 TruncateTableState::DatanodeTruncateRegions => {
68 self.on_datanode_truncate_regions().await
69 }
70 }
71 .map_err(map_to_procedure_error)
72 }
73
74 fn dump(&self) -> ProcedureResult<String> {
75 serde_json::to_string(&self.data).context(ToJsonSnafu)
76 }
77
78 fn lock_key(&self) -> LockKey {
79 let table_ref = &self.data.table_ref();
80 let table_id = self.data.table_id();
81 let lock_key = vec![
82 CatalogLock::Read(table_ref.catalog).into(),
83 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
84 TableLock::Write(table_id).into(),
85 ];
86
87 LockKey::new(lock_key)
88 }
89}
90
91impl TruncateTableProcedure {
92 pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::TruncateTable";
93
94 pub(crate) fn new(
95 task: TruncateTableTask,
96 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
97 region_routes: Vec<RegionRoute>,
98 context: DdlContext,
99 ) -> Self {
100 Self {
101 context,
102 data: TruncateTableData::new(task, table_info_value, region_routes),
103 }
104 }
105
106 pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
107 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
108 Ok(Self { context, data })
109 }
110
111 async fn on_prepare(&mut self) -> Result<Status> {
113 let table_ref = &self.data.table_ref();
114
115 let manager = &self.context.table_metadata_manager;
116
117 let exist = manager
118 .table_name_manager()
119 .exists(TableNameKey::new(
120 table_ref.catalog,
121 table_ref.schema,
122 table_ref.table,
123 ))
124 .await?;
125
126 ensure!(
127 exist,
128 TableNotFoundSnafu {
129 table_name: table_ref.to_string()
130 }
131 );
132
133 self.data.state = TruncateTableState::DatanodeTruncateRegions;
134
135 Ok(Status::executing(true))
136 }
137
138 async fn on_datanode_truncate_regions(&mut self) -> Result<Status> {
139 let table_id = self.data.table_id();
140
141 let region_routes = &self.data.region_routes;
142 let leaders = find_leaders(region_routes);
143 let mut truncate_region_tasks = Vec::with_capacity(leaders.len());
144
145 for datanode in leaders {
146 let requester = self.context.node_manager.datanode(&datanode).await;
147 let regions = find_leader_regions(region_routes, &datanode);
148
149 for region in regions {
150 let region_id = RegionId::new(table_id, region);
151 debug!(
152 "Truncating table {} region {} on Datanode {:?}",
153 self.data.table_ref(),
154 region_id,
155 datanode
156 );
157
158 let time_ranges = &self.data.task.time_ranges;
159 let kind = if time_ranges.is_empty() {
160 truncate_request::Kind::All(api::v1::region::All {})
161 } else {
162 let pb_time_ranges =
163 to_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
164 truncate_request::Kind::TimeRanges(pb_time_ranges)
165 };
166
167 let request = RegionRequest {
168 header: Some(RegionRequestHeader {
169 tracing_context: TracingContext::from_current_span().to_w3c(),
170 ..Default::default()
171 }),
172 body: Some(region_request::Body::Truncate(PbTruncateRegionRequest {
173 region_id: region_id.as_u64(),
174 kind: Some(kind),
175 })),
176 };
177
178 let datanode = datanode.clone();
179 let requester = requester.clone();
180
181 truncate_region_tasks.push(async move {
182 requester
183 .handle(request)
184 .await
185 .map_err(add_peer_context_if_needed(datanode))
186 });
187 }
188 }
189
190 join_all(truncate_region_tasks)
191 .await
192 .into_iter()
193 .collect::<Result<Vec<_>>>()?;
194
195 Ok(Status::done())
196 }
197}
198
199#[derive(Debug, Serialize, Deserialize)]
200pub struct TruncateTableData {
201 state: TruncateTableState,
202 task: TruncateTableTask,
203 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
204 region_routes: Vec<RegionRoute>,
205}
206
207impl TruncateTableData {
208 pub fn new(
209 task: TruncateTableTask,
210 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
211 region_routes: Vec<RegionRoute>,
212 ) -> Self {
213 Self {
214 state: TruncateTableState::Prepare,
215 task,
216 table_info_value,
217 region_routes,
218 }
219 }
220
221 pub fn table_ref(&self) -> TableReference {
222 self.task.table_ref()
223 }
224
225 pub fn table_name(&self) -> TableName {
226 self.task.table_name()
227 }
228
229 fn table_info(&self) -> &RawTableInfo {
230 &self.table_info_value.table_info
231 }
232
233 fn table_id(&self) -> TableId {
234 self.table_info().ident.table_id
235 }
236}
237
238#[derive(Debug, Serialize, Deserialize, AsRefStr)]
239enum TruncateTableState {
240 Prepare,
242 DatanodeTruncateRegions,
244}