1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::{iter, mem};
18
19use api::v1::region::{DeleteRequests as RegionDeleteRequests, RegionRequestHeader};
20use api::v1::{DeleteRequests, RowDeleteRequests};
21use catalog::CatalogManagerRef;
22use common_meta::node_manager::{AffectedRows, NodeManagerRef};
23use common_meta::peer::Peer;
24use common_query::Output;
25use common_telemetry::tracing_context::TracingContext;
26use futures_util::future;
27use partition::manager::PartitionRuleManagerRef;
28use session::context::QueryContextRef;
29use snafu::{ensure, OptionExt, ResultExt};
30use table::requests::DeleteRequest as TableDeleteRequest;
31use table::TableRef;
32
33use crate::error::{
34 CatalogSnafu, FindRegionLeaderSnafu, InvalidDeleteRequestSnafu, JoinTaskSnafu,
35 MissingTimeIndexColumnSnafu, RequestDeletesSnafu, Result, TableNotFoundSnafu,
36};
37use crate::region_req_factory::RegionRequestFactory;
38use crate::req_convert::common::preprocess_row_delete_requests;
39use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion};
40
41pub struct Deleter {
42 catalog_manager: CatalogManagerRef,
43 partition_manager: PartitionRuleManagerRef,
44 node_manager: NodeManagerRef,
45}
46
47pub type DeleterRef = Arc<Deleter>;
48
49impl Deleter {
50 pub fn new(
51 catalog_manager: CatalogManagerRef,
52 partition_manager: PartitionRuleManagerRef,
53 node_manager: NodeManagerRef,
54 ) -> Self {
55 Self {
56 catalog_manager,
57 partition_manager,
58 node_manager,
59 }
60 }
61
62 pub async fn handle_column_deletes(
63 &self,
64 requests: DeleteRequests,
65 ctx: QueryContextRef,
66 ) -> Result<Output> {
67 let row_deletes = ColumnToRow::convert(requests)?;
68 self.handle_row_deletes(row_deletes, ctx).await
69 }
70
71 pub async fn handle_row_deletes(
72 &self,
73 mut requests: RowDeleteRequests,
74 ctx: QueryContextRef,
75 ) -> Result<Output> {
76 preprocess_row_delete_requests(&mut requests.deletes)?;
77 requests.deletes.retain(|req| {
79 req.rows
80 .as_ref()
81 .map(|r| !r.rows.is_empty())
82 .unwrap_or_default()
83 });
84 validate_column_count_match(&requests)?;
85
86 let requests = self.trim_columns(requests, &ctx).await?;
87 let deletes = RowToRegion::new(
88 self.catalog_manager.as_ref(),
89 self.partition_manager.as_ref(),
90 &ctx,
91 )
92 .convert(requests)
93 .await?;
94
95 let affected_rows = self.do_request(deletes, &ctx).await?;
96
97 Ok(Output::new_with_affected_rows(affected_rows))
98 }
99
100 pub async fn handle_table_delete(
101 &self,
102 request: TableDeleteRequest,
103 ctx: QueryContextRef,
104 ) -> Result<AffectedRows> {
105 let catalog = request.catalog_name.as_str();
106 let schema = request.schema_name.as_str();
107 let table = request.table_name.as_str();
108 let table = self.get_table(catalog, schema, table).await?;
109 let table_info = table.table_info();
110
111 let deletes = TableToRegion::new(&table_info, &self.partition_manager)
112 .convert(request)
113 .await?;
114
115 let affected_rows = self.do_request(deletes, &ctx).await?;
116 Ok(affected_rows as _)
117 }
118}
119
120impl Deleter {
121 async fn do_request(
122 &self,
123 requests: RegionDeleteRequests,
124 ctx: &QueryContextRef,
125 ) -> Result<AffectedRows> {
126 let request_factory = RegionRequestFactory::new(RegionRequestHeader {
127 tracing_context: TracingContext::from_current_span().to_w3c(),
128 dbname: ctx.get_db_string(),
129 ..Default::default()
130 });
131
132 let tasks = self
133 .group_requests_by_peer(requests)
134 .await?
135 .into_iter()
136 .map(|(peer, deletes)| {
137 let request = request_factory.build_delete(deletes);
138 let node_manager = self.node_manager.clone();
139 common_runtime::spawn_global(async move {
140 node_manager
141 .datanode(&peer)
142 .await
143 .handle(request)
144 .await
145 .context(RequestDeletesSnafu)
146 })
147 });
148 let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
149
150 let affected_rows = results
151 .into_iter()
152 .map(|resp| resp.map(|r| r.affected_rows))
153 .sum::<Result<AffectedRows>>()?;
154 crate::metrics::DIST_DELETE_ROW_COUNT.inc_by(affected_rows as u64);
155 Ok(affected_rows)
156 }
157
158 async fn group_requests_by_peer(
159 &self,
160 requests: RegionDeleteRequests,
161 ) -> Result<HashMap<Peer, RegionDeleteRequests>> {
162 let mut deletes: HashMap<Peer, RegionDeleteRequests> = HashMap::new();
163
164 for req in requests.requests {
165 let peer = self
166 .partition_manager
167 .find_region_leader(req.region_id.into())
168 .await
169 .context(FindRegionLeaderSnafu)?;
170 deletes.entry(peer).or_default().requests.push(req);
171 }
172
173 Ok(deletes)
174 }
175
176 async fn trim_columns(
177 &self,
178 mut requests: RowDeleteRequests,
179 ctx: &QueryContextRef,
180 ) -> Result<RowDeleteRequests> {
181 for req in &mut requests.deletes {
182 let catalog = ctx.current_catalog();
183 let schema = ctx.current_schema();
184 let table = self.get_table(catalog, &schema, &req.table_name).await?;
185 let key_column_names = self.key_column_names(&table)?;
186
187 let rows = req.rows.as_mut().unwrap();
188 let all_key = rows
189 .schema
190 .iter()
191 .all(|column| key_column_names.contains(&column.column_name));
192 if all_key {
193 continue;
194 }
195
196 for row in &mut rows.rows {
197 let source_values = mem::take(&mut row.values);
198 row.values = rows
199 .schema
200 .iter()
201 .zip(source_values)
202 .filter_map(|(column, v)| {
203 key_column_names.contains(&column.column_name).then_some(v)
204 })
205 .collect();
206 }
207 rows.schema
208 .retain(|column| key_column_names.contains(&column.column_name));
209 }
210 Ok(requests)
211 }
212
213 fn key_column_names(&self, table: &TableRef) -> Result<HashSet<String>> {
214 let time_index = table
215 .schema()
216 .timestamp_column()
217 .with_context(|| table::error::MissingTimeIndexColumnSnafu {
218 table_name: table.table_info().name.clone(),
219 })
220 .context(MissingTimeIndexColumnSnafu)?
221 .name
222 .clone();
223
224 let key_column_names = table
225 .table_info()
226 .meta
227 .row_key_column_names()
228 .cloned()
229 .chain(iter::once(time_index))
230 .collect();
231
232 Ok(key_column_names)
233 }
234
235 async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
236 self.catalog_manager
237 .table(catalog, schema, table, None)
238 .await
239 .context(CatalogSnafu)?
240 .with_context(|| TableNotFoundSnafu {
241 table_name: common_catalog::format_full_table_name(catalog, schema, table),
242 })
243 }
244}
245
246fn validate_column_count_match(requests: &RowDeleteRequests) -> Result<()> {
247 for request in &requests.deletes {
248 let rows = request.rows.as_ref().unwrap();
249 let column_count = rows.schema.len();
250 rows.rows.iter().try_for_each(|r| {
251 ensure!(
252 r.values.len() == column_count,
253 InvalidDeleteRequestSnafu {
254 reason: format!(
255 "column count mismatch, columns: {}, values: {}",
256 column_count,
257 r.values.len()
258 )
259 }
260 );
261 Ok(())
262 })?;
263 }
264 Ok(())
265}