common_meta/ddl/
truncate_table.rs1use api::v1::region::{
16 region_request, RegionRequest, RegionRequestHeader, TruncateRequest as PbTruncateRegionRequest,
17};
18use async_trait::async_trait;
19use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
20use common_procedure::{
21 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
22};
23use common_telemetry::debug;
24use common_telemetry::tracing_context::TracingContext;
25use futures::future::join_all;
26use serde::{Deserialize, Serialize};
27use snafu::{ensure, ResultExt};
28use store_api::storage::RegionId;
29use strum::AsRefStr;
30use table::metadata::{RawTableInfo, TableId};
31use table::table_name::TableName;
32use table::table_reference::TableReference;
33
34use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
35use crate::ddl::DdlContext;
36use crate::error::{Result, TableNotFoundSnafu};
37use crate::key::table_info::TableInfoValue;
38use crate::key::table_name::TableNameKey;
39use crate::key::DeserializedValueWithBytes;
40use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
41use crate::metrics;
42use crate::rpc::ddl::TruncateTableTask;
43use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
44
45pub struct TruncateTableProcedure {
46 context: DdlContext,
47 data: TruncateTableData,
48}
49
50#[async_trait]
51impl Procedure for TruncateTableProcedure {
52 fn type_name(&self) -> &str {
53 Self::TYPE_NAME
54 }
55
56 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
57 let state = &self.data.state;
58
59 let _timer = metrics::METRIC_META_PROCEDURE_TRUNCATE_TABLE
60 .with_label_values(&[state.as_ref()])
61 .start_timer();
62
63 match self.data.state {
64 TruncateTableState::Prepare => self.on_prepare().await,
65 TruncateTableState::DatanodeTruncateRegions => {
66 self.on_datanode_truncate_regions().await
67 }
68 }
69 .map_err(handle_retry_error)
70 }
71
72 fn dump(&self) -> ProcedureResult<String> {
73 serde_json::to_string(&self.data).context(ToJsonSnafu)
74 }
75
76 fn lock_key(&self) -> LockKey {
77 let table_ref = &self.data.table_ref();
78 let table_id = self.data.table_id();
79 let lock_key = vec![
80 CatalogLock::Read(table_ref.catalog).into(),
81 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
82 TableLock::Write(table_id).into(),
83 ];
84
85 LockKey::new(lock_key)
86 }
87}
88
89impl TruncateTableProcedure {
90 pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::TruncateTable";
91
92 pub(crate) fn new(
93 task: TruncateTableTask,
94 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
95 region_routes: Vec<RegionRoute>,
96 context: DdlContext,
97 ) -> Self {
98 Self {
99 context,
100 data: TruncateTableData::new(task, table_info_value, region_routes),
101 }
102 }
103
104 pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
105 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
106 Ok(Self { context, data })
107 }
108
109 async fn on_prepare(&mut self) -> Result<Status> {
111 let table_ref = &self.data.table_ref();
112
113 let manager = &self.context.table_metadata_manager;
114
115 let exist = manager
116 .table_name_manager()
117 .exists(TableNameKey::new(
118 table_ref.catalog,
119 table_ref.schema,
120 table_ref.table,
121 ))
122 .await?;
123
124 ensure!(
125 exist,
126 TableNotFoundSnafu {
127 table_name: table_ref.to_string()
128 }
129 );
130
131 self.data.state = TruncateTableState::DatanodeTruncateRegions;
132
133 Ok(Status::executing(true))
134 }
135
136 async fn on_datanode_truncate_regions(&mut self) -> Result<Status> {
137 let table_id = self.data.table_id();
138
139 let region_routes = &self.data.region_routes;
140 let leaders = find_leaders(region_routes);
141 let mut truncate_region_tasks = Vec::with_capacity(leaders.len());
142
143 for datanode in leaders {
144 let requester = self.context.node_manager.datanode(&datanode).await;
145 let regions = find_leader_regions(region_routes, &datanode);
146
147 for region in regions {
148 let region_id = RegionId::new(table_id, region);
149 debug!(
150 "Truncating table {} region {} on Datanode {:?}",
151 self.data.table_ref(),
152 region_id,
153 datanode
154 );
155
156 let request = RegionRequest {
157 header: Some(RegionRequestHeader {
158 tracing_context: TracingContext::from_current_span().to_w3c(),
159 ..Default::default()
160 }),
161 body: Some(region_request::Body::Truncate(PbTruncateRegionRequest {
162 region_id: region_id.as_u64(),
163 })),
164 };
165
166 let datanode = datanode.clone();
167 let requester = requester.clone();
168
169 truncate_region_tasks.push(async move {
170 requester
171 .handle(request)
172 .await
173 .map_err(add_peer_context_if_needed(datanode))
174 });
175 }
176 }
177
178 join_all(truncate_region_tasks)
179 .await
180 .into_iter()
181 .collect::<Result<Vec<_>>>()?;
182
183 Ok(Status::done())
184 }
185}
186
187#[derive(Debug, Serialize, Deserialize)]
188pub struct TruncateTableData {
189 state: TruncateTableState,
190 task: TruncateTableTask,
191 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
192 region_routes: Vec<RegionRoute>,
193}
194
195impl TruncateTableData {
196 pub fn new(
197 task: TruncateTableTask,
198 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
199 region_routes: Vec<RegionRoute>,
200 ) -> Self {
201 Self {
202 state: TruncateTableState::Prepare,
203 task,
204 table_info_value,
205 region_routes,
206 }
207 }
208
209 pub fn table_ref(&self) -> TableReference {
210 self.task.table_ref()
211 }
212
213 pub fn table_name(&self) -> TableName {
214 self.task.table_name()
215 }
216
217 fn table_info(&self) -> &RawTableInfo {
218 &self.table_info_value.table_info
219 }
220
221 fn table_id(&self) -> TableId {
222 self.table_info().ident.table_id
223 }
224}
225
226#[derive(Debug, Serialize, Deserialize, AsRefStr)]
227enum TruncateTableState {
228 Prepare,
230 DatanodeTruncateRegions,
232}