operator/req_convert/insert/
row_to_region.rs1use ahash::{HashMap, HashSet};
16use api::v1::region::{InsertRequest, InsertRequests as RegionInsertRequests};
17use api::v1::RowInsertRequests;
18use partition::manager::PartitionRuleManager;
19use snafu::OptionExt;
20use store_api::storage::{RegionId, RegionNumber};
21use table::metadata::{TableId, TableInfoRef};
22
23use crate::error::{Result, TableNotFoundSnafu};
24use crate::insert::InstantAndNormalInsertRequests;
25use crate::req_convert::common::partitioner::Partitioner;
26
27pub struct RowToRegion<'a> {
28 tables_info: HashMap<String, TableInfoRef>,
29 instant_table_ids: HashSet<TableId>,
30 partition_manager: &'a PartitionRuleManager,
31}
32
33impl<'a> RowToRegion<'a> {
34 pub fn new(
35 tables_info: HashMap<String, TableInfoRef>,
36 instant_table_ids: HashSet<TableId>,
37 partition_manager: &'a PartitionRuleManager,
38 ) -> Self {
39 Self {
40 tables_info,
41 instant_table_ids,
42 partition_manager,
43 }
44 }
45
46 pub async fn convert(
47 &self,
48 requests: RowInsertRequests,
49 ) -> Result<InstantAndNormalInsertRequests> {
50 let mut region_request = Vec::with_capacity(requests.inserts.len());
51 let mut instant_request = Vec::with_capacity(requests.inserts.len());
52 for request in requests.inserts {
53 let Some(rows) = request.rows else { continue };
54
55 let table_id = self.get_table_id(&request.table_name)?;
56 let region_numbers = self.region_numbers(&request.table_name)?;
57 let requests = if let Some(region_id) = match region_numbers[..] {
58 [singular] => Some(RegionId::new(table_id, singular)),
59 _ => None,
60 } {
61 vec![InsertRequest {
62 region_id: region_id.as_u64(),
63 rows: Some(rows),
64 }]
65 } else {
66 Partitioner::new(self.partition_manager)
67 .partition_insert_requests(table_id, rows)
68 .await?
69 };
70
71 if self.instant_table_ids.contains(&table_id) {
72 instant_request.extend(requests);
73 } else {
74 region_request.extend(requests);
75 }
76 }
77
78 Ok(InstantAndNormalInsertRequests {
79 normal_requests: RegionInsertRequests {
80 requests: region_request,
81 },
82 instant_requests: RegionInsertRequests {
83 requests: instant_request,
84 },
85 })
86 }
87
88 fn get_table_id(&self, table_name: &str) -> Result<TableId> {
89 self.tables_info
90 .get(table_name)
91 .map(|x| x.table_id())
92 .context(TableNotFoundSnafu { table_name })
93 }
94
95 fn region_numbers(&self, table_name: &str) -> Result<&Vec<RegionNumber>> {
96 self.tables_info
97 .get(table_name)
98 .map(|x| &x.meta.region_numbers)
99 .context(TableNotFoundSnafu { table_name })
100 }
101}