operator/req_convert/common/
partitioner.rs1use api::v1::region::{DeleteRequest, InsertRequest};
16use api::v1::{PartitionExprVersion, Rows};
17use partition::manager::PartitionRuleManager;
18use snafu::ResultExt;
19use store_api::storage::RegionId;
20use table::metadata::TableInfo;
21
22use crate::error::{Result, SplitDeleteSnafu, SplitInsertSnafu};
23
24pub struct Partitioner<'a> {
25 partition_manager: &'a PartitionRuleManager,
26}
27
28impl<'a> Partitioner<'a> {
29 pub fn new(partition_manager: &'a PartitionRuleManager) -> Self {
30 Self { partition_manager }
31 }
32
33 pub async fn partition_insert_requests(
34 &self,
35 table_info: &TableInfo,
36 rows: Rows,
37 ) -> Result<Vec<InsertRequest>> {
38 let table_id = table_info.table_id();
39 let requests = self
40 .partition_manager
41 .split_rows(table_info, rows)
42 .await
43 .context(SplitInsertSnafu)?
44 .into_iter()
45 .map(
46 |(region_number, (rows, partition_expr_version))| InsertRequest {
47 region_id: RegionId::new(table_id, region_number).into(),
48 rows: Some(rows),
49 partition_expr_version: partition_expr_version
50 .map(|value| PartitionExprVersion { value }),
51 },
52 )
53 .collect();
54 Ok(requests)
55 }
56
57 pub async fn partition_delete_requests(
58 &self,
59 table_info: &TableInfo,
60 rows: Rows,
61 ) -> Result<Vec<DeleteRequest>> {
62 let table_id = table_info.table_id();
63
64 let requests = self
65 .partition_manager
66 .split_rows(table_info, rows)
67 .await
68 .context(SplitDeleteSnafu)?
69 .into_iter()
70 .map(
71 |(region_number, (rows, partition_expr_version))| DeleteRequest {
72 region_id: RegionId::new(table_id, region_number).into(),
73 rows: Some(rows),
74 partition_expr_version: partition_expr_version
75 .map(|value| PartitionExprVersion { value }),
76 },
77 )
78 .collect();
79 Ok(requests)
80 }
81}