operator/req_convert/delete/
row_to_region.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::region::DeleteRequests as RegionDeleteRequests;
use api::v1::RowDeleteRequests;
use catalog::CatalogManager;
use partition::manager::PartitionRuleManager;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::TableRef;

use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu};
use crate::req_convert::common::partitioner::Partitioner;

pub struct RowToRegion<'a> {
    catalog_manager: &'a dyn CatalogManager,
    partition_manager: &'a PartitionRuleManager,
    ctx: &'a QueryContext,
}

impl<'a> RowToRegion<'a> {
    pub fn new(
        catalog_manager: &'a dyn CatalogManager,
        partition_manager: &'a PartitionRuleManager,
        ctx: &'a QueryContext,
    ) -> Self {
        Self {
            catalog_manager,
            partition_manager,
            ctx,
        }
    }

    pub async fn convert(&self, requests: RowDeleteRequests) -> Result<RegionDeleteRequests> {
        let mut region_request = Vec::with_capacity(requests.deletes.len());
        for request in requests.deletes {
            let table = self.get_table(&request.table_name).await?;
            let table_id = table.table_info().table_id();

            let requests = Partitioner::new(self.partition_manager)
                .partition_delete_requests(table_id, request.rows.unwrap_or_default())
                .await?;

            region_request.extend(requests);
        }

        Ok(RegionDeleteRequests {
            requests: region_request,
        })
    }

    async fn get_table(&self, table_name: &str) -> Result<TableRef> {
        let catalog_name = self.ctx.current_catalog();
        let schema_name = self.ctx.current_schema();
        self.catalog_manager
            .table(catalog_name, &schema_name, table_name, None)
            .await
            .context(CatalogSnafu)?
            .with_context(|| TableNotFoundSnafu {
                table_name: common_catalog::format_full_table_name(
                    catalog_name,
                    &schema_name,
                    table_name,
                ),
            })
    }
}