operator/req_convert/insert/
table_to_region.rs1use api::v1::region::InsertRequests as RegionInsertRequests;
16use api::v1::Rows;
17use partition::manager::PartitionRuleManager;
18use table::metadata::TableInfo;
19use table::requests::InsertRequest as TableInsertRequest;
20
21use crate::error::Result;
22use crate::insert::InstantAndNormalInsertRequests;
23use crate::req_convert::common::partitioner::Partitioner;
24use crate::req_convert::common::{column_schema, row_count};
25
26pub struct TableToRegion<'a> {
27 table_info: &'a TableInfo,
28 partition_manager: &'a PartitionRuleManager,
29}
30
31impl<'a> TableToRegion<'a> {
32 pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self {
33 Self {
34 table_info,
35 partition_manager,
36 }
37 }
38
39 pub async fn convert(
40 &self,
41 request: TableInsertRequest,
42 ) -> Result<InstantAndNormalInsertRequests> {
43 let row_count = row_count(&request.columns_values)?;
44 let schema = column_schema(self.table_info, &request.columns_values)?;
45 let rows = api::helper::vectors_to_rows(request.columns_values.values(), row_count);
46
47 let rows = Rows { schema, rows };
48 let requests = Partitioner::new(self.partition_manager)
49 .partition_insert_requests(self.table_info.table_id(), rows)
50 .await?;
51
52 let requests = RegionInsertRequests { requests };
53 if self.table_info.is_ttl_instant_table() {
54 Ok(InstantAndNormalInsertRequests {
55 normal_requests: Default::default(),
56 instant_requests: requests,
57 })
58 } else {
59 Ok(InstantAndNormalInsertRequests {
60 normal_requests: requests,
61 instant_requests: Default::default(),
62 })
63 }
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 use std::collections::HashMap;
70 use std::sync::Arc;
71
72 use api::v1::region::InsertRequest as RegionInsertRequest;
73 use api::v1::value::ValueData;
74 use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
75 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
76 use datatypes::vectors::{Int32Vector, VectorRef};
77 use store_api::storage::RegionId;
78
79 use super::*;
80 use crate::tests::{
81 create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
82 };
83
84 #[tokio::test]
85 async fn test_insert_request_table_to_region() {
86 let backend = prepare_mocked_backend().await;
97 let partition_manager = create_partition_rule_manager(backend.clone()).await;
98 let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter());
99
100 let converter = TableToRegion::new(&table_info, &partition_manager);
101
102 let table_request = build_table_request(Arc::new(Int32Vector::from(vec![
103 Some(1),
104 None,
105 Some(11),
106 Some(101),
107 ])));
108
109 let region_requests = converter.convert(table_request).await.unwrap();
110 let mut region_id_to_region_requests = region_requests
111 .normal_requests
112 .requests
113 .into_iter()
114 .map(|r| (r.region_id, r))
115 .collect::<HashMap<_, _>>();
116
117 let region_id = RegionId::new(1, 1).as_u64();
118 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
119 assert_eq!(
120 region_request,
121 build_region_request(vec![Some(101)], region_id)
122 );
123
124 let region_id = RegionId::new(1, 2).as_u64();
125 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
126 assert_eq!(
127 region_request,
128 build_region_request(vec![Some(11)], region_id)
129 );
130
131 let region_id = RegionId::new(1, 3).as_u64();
132 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
133 assert_eq!(
134 region_request,
135 build_region_request(vec![Some(1), None], region_id)
136 );
137 }
138
139 fn build_table_request(vector: VectorRef) -> TableInsertRequest {
140 TableInsertRequest {
141 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
142 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
143 table_name: "table_1".to_string(),
144 columns_values: HashMap::from([("a".to_string(), vector)]),
145 }
146 }
147
148 fn build_region_request(rows: Vec<Option<i32>>, region_id: u64) -> RegionInsertRequest {
149 RegionInsertRequest {
150 region_id,
151 rows: Some(Rows {
152 schema: vec![ColumnSchema {
153 column_name: "a".to_string(),
154 datatype: ColumnDataType::Int32 as i32,
155 semantic_type: SemanticType::Tag as i32,
156 ..Default::default()
157 }],
158 rows: rows
159 .into_iter()
160 .map(|v| Row {
161 values: vec![Value {
162 value_data: v.map(ValueData::I32Value),
163 }],
164 })
165 .collect(),
166 }),
167 }
168 }
169}