operator/
delete.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::{iter, mem};
18
19use api::v1::region::{DeleteRequests as RegionDeleteRequests, RegionRequestHeader};
20use api::v1::{DeleteRequests, RowDeleteRequests};
21use catalog::CatalogManagerRef;
22use common_meta::node_manager::{AffectedRows, NodeManagerRef};
23use common_meta::peer::Peer;
24use common_query::Output;
25use common_telemetry::tracing_context::TracingContext;
26use futures_util::future;
27use partition::manager::PartitionRuleManagerRef;
28use session::context::QueryContextRef;
29use snafu::{ensure, OptionExt, ResultExt};
30use table::requests::DeleteRequest as TableDeleteRequest;
31use table::TableRef;
32
33use crate::error::{
34    CatalogSnafu, FindRegionLeaderSnafu, InvalidDeleteRequestSnafu, JoinTaskSnafu,
35    MissingTimeIndexColumnSnafu, RequestDeletesSnafu, Result, TableNotFoundSnafu,
36};
37use crate::region_req_factory::RegionRequestFactory;
38use crate::req_convert::common::preprocess_row_delete_requests;
39use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion};
40
41pub struct Deleter {
42    catalog_manager: CatalogManagerRef,
43    partition_manager: PartitionRuleManagerRef,
44    node_manager: NodeManagerRef,
45}
46
47pub type DeleterRef = Arc<Deleter>;
48
49impl Deleter {
50    pub fn new(
51        catalog_manager: CatalogManagerRef,
52        partition_manager: PartitionRuleManagerRef,
53        node_manager: NodeManagerRef,
54    ) -> Self {
55        Self {
56            catalog_manager,
57            partition_manager,
58            node_manager,
59        }
60    }
61
62    pub async fn handle_column_deletes(
63        &self,
64        requests: DeleteRequests,
65        ctx: QueryContextRef,
66    ) -> Result<Output> {
67        let row_deletes = ColumnToRow::convert(requests)?;
68        self.handle_row_deletes(row_deletes, ctx).await
69    }
70
71    pub async fn handle_row_deletes(
72        &self,
73        mut requests: RowDeleteRequests,
74        ctx: QueryContextRef,
75    ) -> Result<Output> {
76        preprocess_row_delete_requests(&mut requests.deletes)?;
77        // remove empty requests
78        requests.deletes.retain(|req| {
79            req.rows
80                .as_ref()
81                .map(|r| !r.rows.is_empty())
82                .unwrap_or_default()
83        });
84        validate_column_count_match(&requests)?;
85
86        let requests = self.trim_columns(requests, &ctx).await?;
87        let deletes = RowToRegion::new(
88            self.catalog_manager.as_ref(),
89            self.partition_manager.as_ref(),
90            &ctx,
91        )
92        .convert(requests)
93        .await?;
94
95        let affected_rows = self.do_request(deletes, &ctx).await?;
96
97        Ok(Output::new_with_affected_rows(affected_rows))
98    }
99
100    pub async fn handle_table_delete(
101        &self,
102        request: TableDeleteRequest,
103        ctx: QueryContextRef,
104    ) -> Result<AffectedRows> {
105        let catalog = request.catalog_name.as_str();
106        let schema = request.schema_name.as_str();
107        let table = request.table_name.as_str();
108        let table = self.get_table(catalog, schema, table).await?;
109        let table_info = table.table_info();
110
111        let deletes = TableToRegion::new(&table_info, &self.partition_manager)
112            .convert(request)
113            .await?;
114
115        let affected_rows = self.do_request(deletes, &ctx).await?;
116        Ok(affected_rows as _)
117    }
118}
119
120impl Deleter {
121    async fn do_request(
122        &self,
123        requests: RegionDeleteRequests,
124        ctx: &QueryContextRef,
125    ) -> Result<AffectedRows> {
126        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
127            tracing_context: TracingContext::from_current_span().to_w3c(),
128            dbname: ctx.get_db_string(),
129            ..Default::default()
130        });
131
132        let tasks = self
133            .group_requests_by_peer(requests)
134            .await?
135            .into_iter()
136            .map(|(peer, deletes)| {
137                let request = request_factory.build_delete(deletes);
138                let node_manager = self.node_manager.clone();
139                common_runtime::spawn_global(async move {
140                    node_manager
141                        .datanode(&peer)
142                        .await
143                        .handle(request)
144                        .await
145                        .context(RequestDeletesSnafu)
146                })
147            });
148        let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
149
150        let affected_rows = results
151            .into_iter()
152            .map(|resp| resp.map(|r| r.affected_rows))
153            .sum::<Result<AffectedRows>>()?;
154        crate::metrics::DIST_DELETE_ROW_COUNT.inc_by(affected_rows as u64);
155        Ok(affected_rows)
156    }
157
158    async fn group_requests_by_peer(
159        &self,
160        requests: RegionDeleteRequests,
161    ) -> Result<HashMap<Peer, RegionDeleteRequests>> {
162        let mut deletes: HashMap<Peer, RegionDeleteRequests> = HashMap::new();
163
164        for req in requests.requests {
165            let peer = self
166                .partition_manager
167                .find_region_leader(req.region_id.into())
168                .await
169                .context(FindRegionLeaderSnafu)?;
170            deletes.entry(peer).or_default().requests.push(req);
171        }
172
173        Ok(deletes)
174    }
175
176    async fn trim_columns(
177        &self,
178        mut requests: RowDeleteRequests,
179        ctx: &QueryContextRef,
180    ) -> Result<RowDeleteRequests> {
181        for req in &mut requests.deletes {
182            let catalog = ctx.current_catalog();
183            let schema = ctx.current_schema();
184            let table = self.get_table(catalog, &schema, &req.table_name).await?;
185            let key_column_names = self.key_column_names(&table)?;
186
187            let rows = req.rows.as_mut().unwrap();
188            let all_key = rows
189                .schema
190                .iter()
191                .all(|column| key_column_names.contains(&column.column_name));
192            if all_key {
193                continue;
194            }
195
196            for row in &mut rows.rows {
197                let source_values = mem::take(&mut row.values);
198                row.values = rows
199                    .schema
200                    .iter()
201                    .zip(source_values)
202                    .filter_map(|(column, v)| {
203                        key_column_names.contains(&column.column_name).then_some(v)
204                    })
205                    .collect();
206            }
207            rows.schema
208                .retain(|column| key_column_names.contains(&column.column_name));
209        }
210        Ok(requests)
211    }
212
213    fn key_column_names(&self, table: &TableRef) -> Result<HashSet<String>> {
214        let time_index = table
215            .schema()
216            .timestamp_column()
217            .with_context(|| table::error::MissingTimeIndexColumnSnafu {
218                table_name: table.table_info().name.clone(),
219            })
220            .context(MissingTimeIndexColumnSnafu)?
221            .name
222            .clone();
223
224        let key_column_names = table
225            .table_info()
226            .meta
227            .row_key_column_names()
228            .cloned()
229            .chain(iter::once(time_index))
230            .collect();
231
232        Ok(key_column_names)
233    }
234
235    async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
236        self.catalog_manager
237            .table(catalog, schema, table, None)
238            .await
239            .context(CatalogSnafu)?
240            .with_context(|| TableNotFoundSnafu {
241                table_name: common_catalog::format_full_table_name(catalog, schema, table),
242            })
243    }
244}
245
246fn validate_column_count_match(requests: &RowDeleteRequests) -> Result<()> {
247    for request in &requests.deletes {
248        let rows = request.rows.as_ref().unwrap();
249        let column_count = rows.schema.len();
250        rows.rows.iter().try_for_each(|r| {
251            ensure!(
252                r.values.len() == column_count,
253                InvalidDeleteRequestSnafu {
254                    reason: format!(
255                        "column count mismatch, columns: {}, values: {}",
256                        column_count,
257                        r.values.len()
258                    )
259                }
260            );
261            Ok(())
262        })?;
263    }
264    Ok(())
265}