operator/req_convert/insert/
table_to_region.rs1use api::v1::region::InsertRequests as RegionInsertRequests;
16use api::v1::Rows;
17use partition::manager::PartitionRuleManager;
18use table::metadata::TableInfo;
19use table::requests::InsertRequest as TableInsertRequest;
20
21use crate::error::Result;
22use crate::insert::InstantAndNormalInsertRequests;
23use crate::req_convert::common::partitioner::Partitioner;
24use crate::req_convert::common::{column_schema, row_count};
25
26pub struct TableToRegion<'a> {
27 table_info: &'a TableInfo,
28 partition_manager: &'a PartitionRuleManager,
29}
30
31impl<'a> TableToRegion<'a> {
32 pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self {
33 Self {
34 table_info,
35 partition_manager,
36 }
37 }
38
39 pub async fn convert(
40 &self,
41 request: TableInsertRequest,
42 ) -> Result<InstantAndNormalInsertRequests> {
43 let row_count = row_count(&request.columns_values)?;
44 let schema = column_schema(self.table_info, &request.columns_values)?;
45 let rows = api::helper::vectors_to_rows(request.columns_values.values(), row_count);
46
47 let rows = Rows { schema, rows };
48 let requests = Partitioner::new(self.partition_manager)
49 .partition_insert_requests(self.table_info.table_id(), rows)
50 .await?;
51
52 let requests = RegionInsertRequests { requests };
53 if self.table_info.is_ttl_instant_table() {
54 Ok(InstantAndNormalInsertRequests {
55 normal_requests: Default::default(),
56 instant_requests: requests,
57 })
58 } else {
59 Ok(InstantAndNormalInsertRequests {
60 normal_requests: requests,
61 instant_requests: Default::default(),
62 })
63 }
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 use std::collections::HashMap;
70 use std::sync::Arc;
71
72 use api::v1::region::InsertRequest as RegionInsertRequest;
73 use api::v1::value::ValueData;
74 use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
75 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
76 use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
77 use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
78 use common_meta::kv_backend::memory::MemoryKvBackend;
79 use common_meta::kv_backend::KvBackendRef;
80 use datatypes::vectors::{Int32Vector, VectorRef};
81 use store_api::storage::RegionId;
82
83 use super::*;
84 use crate::tests::{create_partition_rule_manager, new_test_table_info};
85
86 async fn prepare_mocked_backend() -> KvBackendRef {
87 let backend = Arc::new(MemoryKvBackend::default());
88
89 let catalog_manager = CatalogManager::new(backend.clone());
90 let schema_manager = SchemaManager::new(backend.clone());
91
92 catalog_manager
93 .create(CatalogNameKey::default(), false)
94 .await
95 .unwrap();
96 schema_manager
97 .create(SchemaNameKey::default(), None, false)
98 .await
99 .unwrap();
100
101 backend
102 }
103
104 #[tokio::test]
105 async fn test_insert_request_table_to_region() {
106 let backend = prepare_mocked_backend().await;
117 let partition_manager = create_partition_rule_manager(backend.clone()).await;
118 let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter());
119
120 let converter = TableToRegion::new(&table_info, &partition_manager);
121
122 let table_request = build_table_request(Arc::new(Int32Vector::from(vec![
123 Some(1),
124 None,
125 Some(11),
126 Some(101),
127 ])));
128
129 let region_requests = converter.convert(table_request).await.unwrap();
130 let mut region_id_to_region_requests = region_requests
131 .normal_requests
132 .requests
133 .into_iter()
134 .map(|r| (r.region_id, r))
135 .collect::<HashMap<_, _>>();
136
137 let region_id = RegionId::new(1, 1).as_u64();
138 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
139 assert_eq!(
140 region_request,
141 build_region_request(vec![Some(101)], region_id)
142 );
143
144 let region_id = RegionId::new(1, 2).as_u64();
145 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
146 assert_eq!(
147 region_request,
148 build_region_request(vec![Some(11)], region_id)
149 );
150
151 let region_id = RegionId::new(1, 3).as_u64();
152 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
153 assert_eq!(
154 region_request,
155 build_region_request(vec![Some(1), None], region_id)
156 );
157 }
158
159 fn build_table_request(vector: VectorRef) -> TableInsertRequest {
160 TableInsertRequest {
161 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
162 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
163 table_name: "table_1".to_string(),
164 columns_values: HashMap::from([("a".to_string(), vector)]),
165 }
166 }
167
168 fn build_region_request(rows: Vec<Option<i32>>, region_id: u64) -> RegionInsertRequest {
169 RegionInsertRequest {
170 region_id,
171 rows: Some(Rows {
172 schema: vec![ColumnSchema {
173 column_name: "a".to_string(),
174 datatype: ColumnDataType::Int32 as i32,
175 semantic_type: SemanticType::Tag as i32,
176 ..Default::default()
177 }],
178 rows: rows
179 .into_iter()
180 .map(|v| Row {
181 values: vec![Value {
182 value_data: v.map(ValueData::I32Value),
183 }],
184 })
185 .collect(),
186 }),
187 }
188 }
189}