operator/req_convert/delete/
row_to_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::region::DeleteRequests as RegionDeleteRequests;
16use api::v1::RowDeleteRequests;
17use catalog::CatalogManager;
18use partition::manager::PartitionRuleManager;
19use session::context::QueryContext;
20use snafu::{OptionExt, ResultExt};
21use table::TableRef;
22
23use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu};
24use crate::req_convert::common::partitioner::Partitioner;
25
26pub struct RowToRegion<'a> {
27    catalog_manager: &'a dyn CatalogManager,
28    partition_manager: &'a PartitionRuleManager,
29    ctx: &'a QueryContext,
30}
31
32impl<'a> RowToRegion<'a> {
33    pub fn new(
34        catalog_manager: &'a dyn CatalogManager,
35        partition_manager: &'a PartitionRuleManager,
36        ctx: &'a QueryContext,
37    ) -> Self {
38        Self {
39            catalog_manager,
40            partition_manager,
41            ctx,
42        }
43    }
44
45    pub async fn convert(&self, requests: RowDeleteRequests) -> Result<RegionDeleteRequests> {
46        let mut region_request = Vec::with_capacity(requests.deletes.len());
47        for request in requests.deletes {
48            let table = self.get_table(&request.table_name).await?;
49            let table_id = table.table_info().table_id();
50
51            let requests = Partitioner::new(self.partition_manager)
52                .partition_delete_requests(table_id, request.rows.unwrap_or_default())
53                .await?;
54
55            region_request.extend(requests);
56        }
57
58        Ok(RegionDeleteRequests {
59            requests: region_request,
60        })
61    }
62
63    async fn get_table(&self, table_name: &str) -> Result<TableRef> {
64        let catalog_name = self.ctx.current_catalog();
65        let schema_name = self.ctx.current_schema();
66        self.catalog_manager
67            .table(catalog_name, &schema_name, table_name, None)
68            .await
69            .context(CatalogSnafu)?
70            .with_context(|| TableNotFoundSnafu {
71                table_name: common_catalog::format_full_table_name(
72                    catalog_name,
73                    &schema_name,
74                    table_name,
75                ),
76            })
77    }
78}