operator/
delete.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::{iter, mem};
18
19use api::v1::region::{DeleteRequests as RegionDeleteRequests, RegionRequestHeader};
20use api::v1::{DeleteRequests, RowDeleteRequests};
21use catalog::CatalogManagerRef;
22use common_meta::node_manager::{AffectedRows, NodeManagerRef};
23use common_meta::peer::Peer;
24use common_query::Output;
25use common_telemetry::tracing_context::TracingContext;
26use futures_util::future;
27use partition::manager::PartitionRuleManagerRef;
28use session::context::QueryContextRef;
29use snafu::{ensure, OptionExt, ResultExt};
30use table::requests::DeleteRequest as TableDeleteRequest;
31use table::TableRef;
32
33use crate::error::{
34    CatalogSnafu, FindRegionLeaderSnafu, InvalidDeleteRequestSnafu, JoinTaskSnafu,
35    MissingTimeIndexColumnSnafu, RequestDeletesSnafu, Result, TableNotFoundSnafu,
36};
37use crate::region_req_factory::RegionRequestFactory;
38use crate::req_convert::common::preprocess_row_delete_requests;
39use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion};
40
41pub struct Deleter {
42    catalog_manager: CatalogManagerRef,
43    partition_manager: PartitionRuleManagerRef,
44    node_manager: NodeManagerRef,
45}
46
47pub type DeleterRef = Arc<Deleter>;
48
49impl Deleter {
50    pub fn new(
51        catalog_manager: CatalogManagerRef,
52        partition_manager: PartitionRuleManagerRef,
53        node_manager: NodeManagerRef,
54    ) -> Self {
55        Self {
56            catalog_manager,
57            partition_manager,
58            node_manager,
59        }
60    }
61
62    pub async fn handle_column_deletes(
63        &self,
64        requests: DeleteRequests,
65        ctx: QueryContextRef,
66    ) -> Result<Output> {
67        let row_deletes = ColumnToRow::convert(requests)?;
68        self.handle_row_deletes(row_deletes, ctx).await
69    }
70
71    pub async fn handle_row_deletes(
72        &self,
73        mut requests: RowDeleteRequests,
74        ctx: QueryContextRef,
75    ) -> Result<Output> {
76        preprocess_row_delete_requests(&mut requests.deletes)?;
77        // remove empty requests
78        requests.deletes.retain(|req| {
79            req.rows
80                .as_ref()
81                .map(|r| !r.rows.is_empty())
82                .unwrap_or_default()
83        });
84        validate_column_count_match(&requests)?;
85
86        let requests = self.trim_columns(requests, &ctx).await?;
87        let deletes = RowToRegion::new(
88            self.catalog_manager.as_ref(),
89            self.partition_manager.as_ref(),
90            &ctx,
91        )
92        .convert(requests)
93        .await?;
94
95        let affected_rows = self.do_request(deletes, &ctx).await?;
96
97        Ok(Output::new_with_affected_rows(affected_rows))
98    }
99
100    pub async fn handle_table_delete(
101        &self,
102        request: TableDeleteRequest,
103        ctx: QueryContextRef,
104    ) -> Result<AffectedRows> {
105        let catalog = request.catalog_name.as_str();
106        let schema = request.schema_name.as_str();
107        let table = request.table_name.as_str();
108        let table = self.get_table(catalog, schema, table).await?;
109        let table_info = table.table_info();
110
111        let deletes = TableToRegion::new(&table_info, &self.partition_manager)
112            .convert(request)
113            .await?;
114
115        let affected_rows = self.do_request(deletes, &ctx).await?;
116        Ok(affected_rows as _)
117    }
118}
119
120impl Deleter {
121    async fn do_request(
122        &self,
123        requests: RegionDeleteRequests,
124        ctx: &QueryContextRef,
125    ) -> Result<AffectedRows> {
126        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
127            tracing_context: TracingContext::from_current_span().to_w3c(),
128            dbname: ctx.get_db_string(),
129            ..Default::default()
130        });
131
132        let tasks = self
133            .group_requests_by_peer(requests)
134            .await?
135            .into_iter()
136            .map(|(peer, deletes)| {
137                let request = request_factory.build_delete(deletes);
138                let node_manager = self.node_manager.clone();
139                common_runtime::spawn_global(async move {
140                    node_manager
141                        .datanode(&peer)
142                        .await
143                        .handle(request)
144                        .await
145                        .context(RequestDeletesSnafu)
146                })
147            });
148        let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
149
150        let affected_rows = results
151            .into_iter()
152            .map(|resp| resp.map(|r| r.affected_rows))
153            .sum::<Result<AffectedRows>>()?;
154        crate::metrics::DIST_DELETE_ROW_COUNT
155            .with_label_values(&[ctx.get_db_string().as_ref()])
156            .inc_by(affected_rows as u64);
157        Ok(affected_rows)
158    }
159
160    async fn group_requests_by_peer(
161        &self,
162        requests: RegionDeleteRequests,
163    ) -> Result<HashMap<Peer, RegionDeleteRequests>> {
164        let mut deletes: HashMap<Peer, RegionDeleteRequests> = HashMap::new();
165
166        for req in requests.requests {
167            let peer = self
168                .partition_manager
169                .find_region_leader(req.region_id.into())
170                .await
171                .context(FindRegionLeaderSnafu)?;
172            deletes.entry(peer).or_default().requests.push(req);
173        }
174
175        Ok(deletes)
176    }
177
178    async fn trim_columns(
179        &self,
180        mut requests: RowDeleteRequests,
181        ctx: &QueryContextRef,
182    ) -> Result<RowDeleteRequests> {
183        for req in &mut requests.deletes {
184            let catalog = ctx.current_catalog();
185            let schema = ctx.current_schema();
186            let table = self.get_table(catalog, &schema, &req.table_name).await?;
187            let key_column_names = self.key_column_names(&table)?;
188
189            let rows = req.rows.as_mut().unwrap();
190            let all_key = rows
191                .schema
192                .iter()
193                .all(|column| key_column_names.contains(&column.column_name));
194            if all_key {
195                continue;
196            }
197
198            for row in &mut rows.rows {
199                let source_values = mem::take(&mut row.values);
200                row.values = rows
201                    .schema
202                    .iter()
203                    .zip(source_values)
204                    .filter_map(|(column, v)| {
205                        key_column_names.contains(&column.column_name).then_some(v)
206                    })
207                    .collect();
208            }
209            rows.schema
210                .retain(|column| key_column_names.contains(&column.column_name));
211        }
212        Ok(requests)
213    }
214
215    fn key_column_names(&self, table: &TableRef) -> Result<HashSet<String>> {
216        let time_index = table
217            .schema()
218            .timestamp_column()
219            .with_context(|| table::error::MissingTimeIndexColumnSnafu {
220                table_name: table.table_info().name.clone(),
221            })
222            .context(MissingTimeIndexColumnSnafu)?
223            .name
224            .clone();
225
226        let key_column_names = table
227            .table_info()
228            .meta
229            .row_key_column_names()
230            .cloned()
231            .chain(iter::once(time_index))
232            .collect();
233
234        Ok(key_column_names)
235    }
236
237    async fn get_table(&self, catalog: &str, schema: &str, table: &str) -> Result<TableRef> {
238        self.catalog_manager
239            .table(catalog, schema, table, None)
240            .await
241            .context(CatalogSnafu)?
242            .with_context(|| TableNotFoundSnafu {
243                table_name: common_catalog::format_full_table_name(catalog, schema, table),
244            })
245    }
246}
247
248fn validate_column_count_match(requests: &RowDeleteRequests) -> Result<()> {
249    for request in &requests.deletes {
250        let rows = request.rows.as_ref().unwrap();
251        let column_count = rows.schema.len();
252        rows.rows.iter().try_for_each(|r| {
253            ensure!(
254                r.values.len() == column_count,
255                InvalidDeleteRequestSnafu {
256                    reason: format!(
257                        "column count mismatch, columns: {}, values: {}",
258                        column_count,
259                        r.values.len()
260                    )
261                }
262            );
263            Ok(())
264        })?;
265    }
266    Ok(())
267}