operator/req_convert/common/
partitioner.rs1use api::v1::region::{DeleteRequest, InsertRequest};
16use api::v1::Rows;
17use partition::manager::PartitionRuleManager;
18use snafu::ResultExt;
19use store_api::storage::{RegionId, TableId};
20
21use crate::error::{Result, SplitDeleteSnafu, SplitInsertSnafu};
22
23pub struct Partitioner<'a> {
24 partition_manager: &'a PartitionRuleManager,
25}
26
27impl<'a> Partitioner<'a> {
28 pub fn new(partition_manager: &'a PartitionRuleManager) -> Self {
29 Self { partition_manager }
30 }
31
32 pub async fn partition_insert_requests(
33 &self,
34 table_id: TableId,
35 rows: Rows,
36 ) -> Result<Vec<InsertRequest>> {
37 let requests = self
38 .partition_manager
39 .split_rows(table_id, rows)
40 .await
41 .context(SplitInsertSnafu)?
42 .into_iter()
43 .map(|(region_number, rows)| InsertRequest {
44 region_id: RegionId::new(table_id, region_number).into(),
45 rows: Some(rows),
46 })
47 .collect();
48 Ok(requests)
49 }
50
51 pub async fn partition_delete_requests(
52 &self,
53 table_id: TableId,
54 rows: Rows,
55 ) -> Result<Vec<DeleteRequest>> {
56 let requests = self
57 .partition_manager
58 .split_rows(table_id, rows)
59 .await
60 .context(SplitDeleteSnafu)?
61 .into_iter()
62 .map(|(region_number, rows)| DeleteRequest {
63 region_id: RegionId::new(table_id, region_number).into(),
64 rows: Some(rows),
65 })
66 .collect();
67 Ok(requests)
68 }
69}