common_meta/ddl/
truncate_table.rs1use api::helper::to_pb_time_ranges;
16use api::v1::region::{
17 RegionRequest, RegionRequestHeader, TruncateRequest as PbTruncateRegionRequest, region_request,
18 truncate_request,
19};
20use async_trait::async_trait;
21use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
22use common_procedure::{
23 Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
24};
25use common_telemetry::debug;
26use common_telemetry::tracing_context::TracingContext;
27use futures::future::join_all;
28use serde::{Deserialize, Serialize};
29use snafu::{ResultExt, ensure};
30use store_api::storage::RegionId;
31use strum::AsRefStr;
32use table::metadata::{TableId, TableInfo};
33use table::table_name::TableName;
34use table::table_reference::TableReference;
35
36use crate::ddl::DdlContext;
37use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
38use crate::error::{ConvertTimeRangesSnafu, Result, TableNotFoundSnafu};
39use crate::key::DeserializedValueWithBytes;
40use crate::key::table_info::TableInfoValue;
41use crate::key::table_name::TableNameKey;
42use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
43use crate::metrics;
44use crate::rpc::ddl::TruncateTableTask;
45use crate::rpc::router::{find_leader_regions, find_leaders};
46
47pub struct TruncateTableProcedure {
48 context: DdlContext,
49 data: TruncateTableData,
50}
51
52#[async_trait]
53impl Procedure for TruncateTableProcedure {
54 fn type_name(&self) -> &str {
55 Self::TYPE_NAME
56 }
57
58 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
59 let state = &self.data.state;
60
61 let _timer = metrics::METRIC_META_PROCEDURE_TRUNCATE_TABLE
62 .with_label_values(&[state.as_ref()])
63 .start_timer();
64
65 match self.data.state {
66 TruncateTableState::Prepare => self.on_prepare().await,
67 TruncateTableState::DatanodeTruncateRegions => {
68 self.on_datanode_truncate_regions().await
69 }
70 }
71 .map_err(map_to_procedure_error)
72 }
73
74 fn dump(&self) -> ProcedureResult<String> {
75 serde_json::to_string(&self.data).context(ToJsonSnafu)
76 }
77
78 fn lock_key(&self) -> LockKey {
79 let table_ref = &self.data.table_ref();
80 let table_id = self.data.table_id();
81 let lock_key = vec![
82 CatalogLock::Read(table_ref.catalog).into(),
83 SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
84 TableLock::Write(table_id).into(),
85 ];
86
87 LockKey::new(lock_key)
88 }
89}
90
91impl TruncateTableProcedure {
92 pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::TruncateTable";
93
94 pub(crate) fn new(
95 task: TruncateTableTask,
96 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
97 context: DdlContext,
98 ) -> Self {
99 Self {
100 context,
101 data: TruncateTableData::new(task, table_info_value),
102 }
103 }
104
105 pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
106 let data = serde_json::from_str(json).context(FromJsonSnafu)?;
107 Ok(Self { context, data })
108 }
109
110 async fn on_prepare(&mut self) -> Result<Status> {
112 let table_ref = &self.data.table_ref();
113
114 let manager = &self.context.table_metadata_manager;
115
116 let exist = manager
117 .table_name_manager()
118 .exists(TableNameKey::new(
119 table_ref.catalog,
120 table_ref.schema,
121 table_ref.table,
122 ))
123 .await?;
124
125 ensure!(
126 exist,
127 TableNotFoundSnafu {
128 table_name: table_ref.to_string()
129 }
130 );
131
132 self.data.state = TruncateTableState::DatanodeTruncateRegions;
133
134 Ok(Status::executing(true))
135 }
136
137 async fn on_datanode_truncate_regions(&mut self) -> Result<Status> {
138 let table_id = self.data.table_id();
139
140 let (_, physical_table_route) = self
141 .context
142 .table_metadata_manager
143 .table_route_manager()
144 .get_physical_table_route(table_id)
145 .await?;
146 let leaders = find_leaders(&physical_table_route.region_routes);
147 let mut truncate_region_tasks = Vec::with_capacity(leaders.len());
148
149 for datanode in leaders {
150 let requester = self.context.node_manager.datanode(&datanode).await;
151 let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);
152
153 for region in regions {
154 let region_id = RegionId::new(table_id, region);
155 debug!(
156 "Truncating table {} region {} on Datanode {:?}",
157 self.data.table_ref(),
158 region_id,
159 datanode
160 );
161
162 let time_ranges = &self.data.task.time_ranges;
163 let kind = if time_ranges.is_empty() {
164 truncate_request::Kind::All(api::v1::region::All {})
165 } else {
166 let pb_time_ranges =
167 to_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
168 truncate_request::Kind::TimeRanges(pb_time_ranges)
169 };
170
171 let request = RegionRequest {
172 header: Some(RegionRequestHeader {
173 tracing_context: TracingContext::from_current_span().to_w3c(),
174 ..Default::default()
175 }),
176 body: Some(region_request::Body::Truncate(PbTruncateRegionRequest {
177 region_id: region_id.as_u64(),
178 kind: Some(kind),
179 })),
180 };
181
182 let datanode = datanode.clone();
183 let requester = requester.clone();
184
185 truncate_region_tasks.push(async move {
186 requester
187 .handle(request)
188 .await
189 .map_err(add_peer_context_if_needed(datanode))
190 });
191 }
192 }
193
194 join_all(truncate_region_tasks)
195 .await
196 .into_iter()
197 .collect::<Result<Vec<_>>>()?;
198
199 Ok(Status::done())
200 }
201}
202
203#[derive(Debug, Serialize, Deserialize)]
204pub struct TruncateTableData {
205 state: TruncateTableState,
206 task: TruncateTableTask,
207 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
208}
209
210impl TruncateTableData {
211 pub fn new(
212 task: TruncateTableTask,
213 table_info_value: DeserializedValueWithBytes<TableInfoValue>,
214 ) -> Self {
215 Self {
216 state: TruncateTableState::Prepare,
217 task,
218 table_info_value,
219 }
220 }
221
222 pub fn table_ref(&self) -> TableReference<'_> {
223 self.task.table_ref()
224 }
225
226 pub fn table_name(&self) -> TableName {
227 self.task.table_name()
228 }
229
230 fn table_info(&self) -> &TableInfo {
231 &self.table_info_value.table_info
232 }
233
234 fn table_id(&self) -> TableId {
235 self.table_info().ident.table_id
236 }
237}
238
239#[derive(Debug, Serialize, Deserialize, AsRefStr)]
240enum TruncateTableState {
241 Prepare,
243 DatanodeTruncateRegions,
245}