operator/req_convert/delete/
row_to_region.rs1use api::v1::region::DeleteRequests as RegionDeleteRequests;
16use api::v1::RowDeleteRequests;
17use catalog::CatalogManager;
18use partition::manager::PartitionRuleManager;
19use session::context::QueryContext;
20use snafu::{OptionExt, ResultExt};
21use table::TableRef;
22
23use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu};
24use crate::req_convert::common::partitioner::Partitioner;
25
26pub struct RowToRegion<'a> {
27 catalog_manager: &'a dyn CatalogManager,
28 partition_manager: &'a PartitionRuleManager,
29 ctx: &'a QueryContext,
30}
31
32impl<'a> RowToRegion<'a> {
33 pub fn new(
34 catalog_manager: &'a dyn CatalogManager,
35 partition_manager: &'a PartitionRuleManager,
36 ctx: &'a QueryContext,
37 ) -> Self {
38 Self {
39 catalog_manager,
40 partition_manager,
41 ctx,
42 }
43 }
44
45 pub async fn convert(&self, requests: RowDeleteRequests) -> Result<RegionDeleteRequests> {
46 let mut region_request = Vec::with_capacity(requests.deletes.len());
47 for request in requests.deletes {
48 let table = self.get_table(&request.table_name).await?;
49 let table_id = table.table_info().table_id();
50
51 let requests = Partitioner::new(self.partition_manager)
52 .partition_delete_requests(table_id, request.rows.unwrap_or_default())
53 .await?;
54
55 region_request.extend(requests);
56 }
57
58 Ok(RegionDeleteRequests {
59 requests: region_request,
60 })
61 }
62
63 async fn get_table(&self, table_name: &str) -> Result<TableRef> {
64 let catalog_name = self.ctx.current_catalog();
65 let schema_name = self.ctx.current_schema();
66 self.catalog_manager
67 .table(catalog_name, &schema_name, table_name, None)
68 .await
69 .context(CatalogSnafu)?
70 .with_context(|| TableNotFoundSnafu {
71 table_name: common_catalog::format_full_table_name(
72 catalog_name,
73 &schema_name,
74 table_name,
75 ),
76 })
77 }
78}