operator/req_convert/delete/
row_to_region.rsuse api::v1::region::DeleteRequests as RegionDeleteRequests;
use api::v1::RowDeleteRequests;
use catalog::CatalogManager;
use partition::manager::PartitionRuleManager;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::TableRef;
use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu};
use crate::req_convert::common::partitioner::Partitioner;
pub struct RowToRegion<'a> {
catalog_manager: &'a dyn CatalogManager,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
}
impl<'a> RowToRegion<'a> {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
) -> Self {
Self {
catalog_manager,
partition_manager,
ctx,
}
}
pub async fn convert(&self, requests: RowDeleteRequests) -> Result<RegionDeleteRequests> {
let mut region_request = Vec::with_capacity(requests.deletes.len());
for request in requests.deletes {
let table = self.get_table(&request.table_name).await?;
let table_id = table.table_info().table_id();
let requests = Partitioner::new(self.partition_manager)
.partition_delete_requests(table_id, request.rows.unwrap_or_default())
.await?;
region_request.extend(requests);
}
Ok(RegionDeleteRequests {
requests: region_request,
})
}
async fn get_table(&self, table_name: &str) -> Result<TableRef> {
let catalog_name = self.ctx.current_catalog();
let schema_name = self.ctx.current_schema();
self.catalog_manager
.table(catalog_name, &schema_name, table_name, None)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(
catalog_name,
&schema_name,
table_name,
),
})
}
}