operator/req_convert/common/
partitioner.rs1use api::v1::region::{DeleteRequest, InsertRequest};
16use api::v1::Rows;
17use partition::manager::PartitionRuleManager;
18use snafu::ResultExt;
19use store_api::storage::RegionId;
20use table::metadata::TableInfo;
21
22use crate::error::{Result, SplitDeleteSnafu, SplitInsertSnafu};
23
24pub struct Partitioner<'a> {
25 partition_manager: &'a PartitionRuleManager,
26}
27
28impl<'a> Partitioner<'a> {
29 pub fn new(partition_manager: &'a PartitionRuleManager) -> Self {
30 Self { partition_manager }
31 }
32
33 pub async fn partition_insert_requests(
34 &self,
35 table_info: &TableInfo,
36 rows: Rows,
37 ) -> Result<Vec<InsertRequest>> {
38 let table_id = table_info.table_id();
39 let requests = self
40 .partition_manager
41 .split_rows(table_info, rows)
42 .await
43 .context(SplitInsertSnafu)?
44 .into_iter()
45 .map(|(region_number, rows)| InsertRequest {
46 region_id: RegionId::new(table_id, region_number).into(),
47 rows: Some(rows),
48 })
49 .collect();
50 Ok(requests)
51 }
52
53 pub async fn partition_delete_requests(
54 &self,
55 table_info: &TableInfo,
56 rows: Rows,
57 ) -> Result<Vec<DeleteRequest>> {
58 let table_id = table_info.table_id();
59 let requests = self
60 .partition_manager
61 .split_rows(table_info, rows)
62 .await
63 .context(SplitDeleteSnafu)?
64 .into_iter()
65 .map(|(region_number, rows)| DeleteRequest {
66 region_id: RegionId::new(table_id, region_number).into(),
67 rows: Some(rows),
68 })
69 .collect();
70 Ok(requests)
71 }
72}