1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::{iter, mem};
18
19use api::v1::region::{DeleteRequests as RegionDeleteRequests, RegionRequestHeader};
20use api::v1::{DeleteRequests, RowDeleteRequests};
21use catalog::CatalogManagerRef;
22use common_meta::node_manager::{AffectedRows, NodeManagerRef};
23use common_meta::peer::Peer;
24use common_query::Output;
25use common_telemetry::tracing_context::TracingContext;
26use futures_util::future;
27use partition::manager::PartitionRuleManagerRef;
28use session::context::QueryContextRef;
29use snafu::{ensure, OptionExt, ResultExt};
30use table::requests::DeleteRequest as TableDeleteRequest;
31use table::TableRef;
32
33use crate::error::{
34 CatalogSnafu, FindRegionLeaderSnafu, InvalidDeleteRequestSnafu, JoinTaskSnafu,
35 MissingTimeIndexColumnSnafu, RequestDeletesSnafu, Result, TableNotFoundSnafu,
36};
37use crate::region_req_factory::RegionRequestFactory;
38use crate::req_convert::common::preprocess_row_delete_requests;
39use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion};
40
41pub struct Deleter {
42 catalog_manager: CatalogManagerRef,
43 partition_manager: PartitionRuleManagerRef,
44 node_manager: NodeManagerRef,
45}
46
47pub type DeleterRef = Arc<Deleter>;
48
49impl Deleter {
50 pub fn new(
51 catalog_manager: CatalogManagerRef,
52 partition_manager: PartitionRuleManagerRef,
53 node_manager: NodeManagerRef,
54 ) -> Self {
55 Self {
56 catalog_manager,
57 partition_manager,
58 node_manager,
59 }
60 }
61
62 pub async fn handle_column_deletes(
63 &self,
64 requests: DeleteRequests,
65 ctx: QueryContextRef,
66 ) -> Result<Output> {
67 let row_deletes = ColumnToRow::convert(requests)?;
68 self.handle_row_deletes(row_deletes, ctx).await
69 }
70
71 pub async fn handle_row_deletes(
72 &self,
73 mut requests: RowDeleteRequests,
74 ctx: QueryContextRef,
75 ) -> Result<Output> {
76 preprocess_row_delete_requests(&mut requests.deletes)?;
77 requests.deletes.retain(|req| {
79 req.rows
80 .as_ref()
81 .map(|r| !r.rows.is_empty())
82 .unwrap_or_default()
83 });
84 validate_column_count_match(&requests)?;
85
86 let requests = self.trim_columns(requests, &ctx).await?;
87 let deletes = RowToRegion::new(
88 self.catalog_manager.as_ref(),
89 self.partition_manager.as_ref(),
90 &ctx,
91 )
92 .convert(requests)
93 .await?;
94
95 let affected_rows = self.do_request(deletes, &ctx).await?;
96
97 Ok(Output::new_with_affected_rows(affected_rows))
98 }
99
100 pub async fn handle_table_delete(
101 &self,
102 request: TableDeleteRequest,
103 ctx: QueryContextRef,
104 ) -> Result<AffectedRows> {
105 let catalog = request.catalog_name.as_str();
106 let schema = request.schema_name.as_str();
107 let table = request.table_name.as_str();
108 let table = self.get_table(catalog, schema, table).await?;
109 let table_info = table.table_info();
110
111 let deletes = TableToRegion::new(&table_info, &self.partition_manager)
112 .convert(request)
113 .await?;
114
115 let affected_rows = self.do_request(deletes, &ctx).await?;
116 Ok(affected_rows as _)
117 }
118}
119
120impl Deleter {
121 async fn do_request(
122 &self,
123 requests: RegionDeleteRequests,
124 ctx: &QueryContextRef,
125 ) -> Result<AffectedRows> {
126 let request_factory = RegionRequestFactory::new(RegionRequestHeader {
127 tracing_context: TracingContext::from_current_span().to_w3c(),
128 dbname: ctx.get_db_string(),
129 ..Default::default()
130 });
131
132 let tasks = self
133 .group_requests_by_peer(requests)
134 .await?
135 .into_iter()
136 .map(|(peer, deletes)| {
137 let request = request_factory.build_delete(deletes);
138 let node_manager = self.node_manager.clone();
139 common_runtime::spawn_global(async move {
140 node_manager
141 .datanode(&peer)
142 .await
143 .handle(request)
144 .await
145 .context(RequestDeletesSnafu)
146 })
147 });
148 let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
149
150 let affected_rows = results
151 .into_iter()
152 .map(|resp| resp.map(|r| r.affected_rows))
153 .sum::<Result<AffectedRows>>()?;
154 crate::metrics::DIST_DELETE_ROW_COUNT
155 .with_label_values(&[ctx.get_db_string().as_ref()])
156 .inc_by(affected_rows as u64);
157 Ok(affected_rows)
158 }
159
160 async fn group_requests_by_peer(
161 &self,
162 requests: RegionDeleteRequests,
163 ) -> Result<HashMap<Peer, RegionDeleteRequests>> {
164 let mut deletes: HashMap<Peer, RegionDeleteRequests> = HashMap::new();
165
166 for req in requests.requests {
167 let peer = self
168 .partition_manager
169 .find_region_leader(req.region_id.into())
170 .await
171 .context(FindRegionLeaderSnafu)?;
172 deletes.entry(peer).or_default().requests.push(req);
173 }
174
175 Ok(deletes)
176 }
177
178 async fn trim_columns(
179 &self,
180 mut requests: RowDeleteRequests,
181 ctx: &QueryContextRef,
182 ) -> Result<RowDeleteRequests> {
183 for req in &mut requests.deletes {
184 let catalog = ctx.current_catalog();
185 let schema = ctx.current_schema();
186 let table = self.get_table(catalog, &schema, &req.table_name).await?;
187 let key_column_names = self.key_column_names(&table)?;
188
189 let rows = req.rows.as_mut().unwrap();
190 let all_key = rows
191 .schema
192 .iter()
193 .all(|column| key_column_names.contains(&column.column_name));
194 if all_key {
195 continue;
196 }
197
198 for row in &mut rows.rows {
199 let source_values = mem::take(&mut row.values);
200 row.values = rows
201 .schema
202 .iter()
203 .zip(source_values)
204 .filter_map(|(column, v)| {
205 key_column_names.contains(&column.column_name).then_some(v)
206 })
207 .collect();
208 }
209 rows.schema
210 .retain(|column| key_column_names.contains(&column.column_name));
211 }
212 Ok(requests)
213 }
214
215 fn key_column_names(&self, table: &TableRef) -> Result<HashSet<String>> {
216 let time_index = table
217 .schema()
218 .timestamp_column()
219 .with_context(|| table::error::MissingTimeIndexColumnSnafu {
220 table_name: table.table_info().name.clone(),
221 })
222 .context(MissingTimeIndexColumnSnafu)?
223 .name
224 .clone();
225
226 let key_column_names = table
227 .table_info()
228 .meta
229 .row_key_column_names()
230 .cloned()
231 .chain(iter::once(time_index))
232 .collect();
233
234 Ok(key_column_names)
235 }
236
237 async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
238 self.catalog_manager
239 .table(catalog, schema, table, None)
240 .await
241 .context(CatalogSnafu)?
242 .with_context(|| TableNotFoundSnafu {
243 table_name: common_catalog::format_full_table_name(catalog, schema, table),
244 })
245 }
246}
247
248fn validate_column_count_match(requests: &RowDeleteRequests) -> Result<()> {
249 for request in &requests.deletes {
250 let rows = request.rows.as_ref().unwrap();
251 let column_count = rows.schema.len();
252 rows.rows.iter().try_for_each(|r| {
253 ensure!(
254 r.values.len() == column_count,
255 InvalidDeleteRequestSnafu {
256 reason: format!(
257 "column count mismatch, columns: {}, values: {}",
258 column_count,
259 r.values.len()
260 )
261 }
262 );
263 Ok(())
264 })?;
265 }
266 Ok(())
267}