operator/req_convert/insert/
row_to_region.rs1use ahash::{HashMap, HashSet};
16use api::v1::region::{InsertRequest, InsertRequests as RegionInsertRequests};
17use api::v1::RowInsertRequests;
18use partition::manager::PartitionRuleManager;
19use snafu::OptionExt;
20use store_api::storage::{RegionId, RegionNumber};
21use table::metadata::{TableId, TableInfoRef};
22
23use crate::error::{Result, TableNotFoundSnafu};
24use crate::insert::InstantAndNormalInsertRequests;
25use crate::req_convert::common::partitioner::Partitioner;
26
27pub struct RowToRegion<'a> {
28 tables_info: HashMap<String, TableInfoRef>,
29 instant_table_ids: HashSet<TableId>,
30 partition_manager: &'a PartitionRuleManager,
31}
32
33impl<'a> RowToRegion<'a> {
34 pub fn new(
35 tables_info: HashMap<String, TableInfoRef>,
36 instant_table_ids: HashSet<TableId>,
37 partition_manager: &'a PartitionRuleManager,
38 ) -> Self {
39 Self {
40 tables_info,
41 instant_table_ids,
42 partition_manager,
43 }
44 }
45
46 pub async fn convert(
47 &self,
48 requests: RowInsertRequests,
49 ) -> Result<InstantAndNormalInsertRequests> {
50 let mut region_request = Vec::with_capacity(requests.inserts.len());
51 let mut instant_request = Vec::with_capacity(requests.inserts.len());
52 for request in requests.inserts {
53 let Some(rows) = request.rows else { continue };
54
55 let table_info = self.get_table_info(&request.table_name)?;
56 let table_id = table_info.table_id();
57 let region_numbers = self.region_numbers(&request.table_name)?;
58 let requests = if let Some(region_id) = match region_numbers[..] {
59 [singular] => Some(RegionId::new(table_id, singular)),
60 _ => None,
61 } {
62 vec![InsertRequest {
63 region_id: region_id.as_u64(),
64 rows: Some(rows),
65 }]
66 } else {
67 Partitioner::new(self.partition_manager)
68 .partition_insert_requests(table_info, rows)
69 .await?
70 };
71
72 if self.instant_table_ids.contains(&table_id) {
73 instant_request.extend(requests);
74 } else {
75 region_request.extend(requests);
76 }
77 }
78
79 Ok(InstantAndNormalInsertRequests {
80 normal_requests: RegionInsertRequests {
81 requests: region_request,
82 },
83 instant_requests: RegionInsertRequests {
84 requests: instant_request,
85 },
86 })
87 }
88
89 fn get_table_info(&self, table_name: &str) -> Result<&TableInfoRef> {
90 self.tables_info
91 .get(table_name)
92 .context(TableNotFoundSnafu { table_name })
93 }
94
95 fn region_numbers(&self, table_name: &str) -> Result<&Vec<RegionNumber>> {
96 self.tables_info
97 .get(table_name)
98 .map(|x| &x.meta.region_numbers)
99 .context(TableNotFoundSnafu { table_name })
100 }
101}