operator/req_convert/insert/
table_to_region.rs1use api::v1::Rows;
16use api::v1::region::InsertRequests as RegionInsertRequests;
17use partition::manager::PartitionRuleManager;
18use table::metadata::TableInfo;
19use table::requests::InsertRequest as TableInsertRequest;
20
21use crate::error::Result;
22use crate::insert::InstantAndNormalInsertRequests;
23use crate::req_convert::common::partitioner::Partitioner;
24use crate::req_convert::common::{column_schema, row_count};
25
26pub struct TableToRegion<'a> {
27 table_info: &'a TableInfo,
28 partition_manager: &'a PartitionRuleManager,
29}
30
31impl<'a> TableToRegion<'a> {
32 pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self {
33 Self {
34 table_info,
35 partition_manager,
36 }
37 }
38
39 pub async fn convert(
40 &self,
41 request: TableInsertRequest,
42 ) -> Result<InstantAndNormalInsertRequests> {
43 let row_count = row_count(&request.columns_values)?;
44 let schema = column_schema(self.table_info, &request.columns_values)?;
45 let rows = api::helper::vectors_to_rows(request.columns_values.values(), row_count);
46
47 let rows = Rows { schema, rows };
48 let requests = Partitioner::new(self.partition_manager)
49 .partition_insert_requests(self.table_info, rows)
50 .await?;
51
52 let requests = RegionInsertRequests { requests };
53 if self.table_info.is_ttl_instant_table() {
54 Ok(InstantAndNormalInsertRequests {
55 normal_requests: Default::default(),
56 instant_requests: requests,
57 })
58 } else {
59 Ok(InstantAndNormalInsertRequests {
60 normal_requests: requests,
61 instant_requests: Default::default(),
62 })
63 }
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 use std::collections::HashMap;
70 use std::sync::Arc;
71
72 use api::v1::helper::tag_column_schema;
73 use api::v1::region::InsertRequest as RegionInsertRequest;
74 use api::v1::value::ValueData;
75 use api::v1::{ColumnDataType, PartitionExprVersion, Row, Value};
76 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
77 use datatypes::vectors::{Int32Vector, VectorRef};
78 use store_api::storage::RegionId;
79
80 use super::*;
81 use crate::tests::{
82 create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
83 };
84
85 #[tokio::test]
86 async fn test_insert_request_table_to_region() {
87 let backend = prepare_mocked_backend().await;
98 let partition_manager = create_partition_rule_manager(backend.clone()).await;
99 let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter());
100
101 let converter = TableToRegion::new(&table_info, &partition_manager);
102
103 let table_request = build_table_request(Arc::new(Int32Vector::from(vec![
104 Some(1),
105 None,
106 Some(11),
107 Some(101),
108 ])));
109 let versions = partition_manager
110 .find_physical_partition_info(1)
111 .await
112 .unwrap()
113 .partitions
114 .iter()
115 .map(|p| (p.id.as_u64(), p.partition_expr_version))
116 .collect::<HashMap<_, _>>();
117
118 let region_requests = converter.convert(table_request).await.unwrap();
119 let mut region_id_to_region_requests = region_requests
120 .normal_requests
121 .requests
122 .into_iter()
123 .map(|r| (r.region_id, r))
124 .collect::<HashMap<_, _>>();
125
126 let region_id = RegionId::new(1, 1).as_u64();
127 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
128 assert_eq!(
129 region_request,
130 build_region_request(vec![Some(101)], region_id, versions[®ion_id])
131 );
132
133 let region_id = RegionId::new(1, 2).as_u64();
134 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
135 assert_eq!(
136 region_request,
137 build_region_request(vec![Some(11)], region_id, versions[®ion_id])
138 );
139
140 let region_id = RegionId::new(1, 3).as_u64();
141 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
142 assert_eq!(
143 region_request,
144 build_region_request(vec![Some(1), None], region_id, versions[®ion_id])
145 );
146 }
147
148 fn build_table_request(vector: VectorRef) -> TableInsertRequest {
149 TableInsertRequest {
150 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
151 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
152 table_name: "table_1".to_string(),
153 columns_values: HashMap::from([("a".to_string(), vector)]),
154 }
155 }
156
157 fn build_region_request(
158 rows: Vec<Option<i32>>,
159 region_id: u64,
160 version: Option<u64>,
161 ) -> RegionInsertRequest {
162 RegionInsertRequest {
163 region_id,
164 rows: Some(Rows {
165 schema: vec![tag_column_schema("a", ColumnDataType::Int32)],
166 rows: rows
167 .into_iter()
168 .map(|v| Row {
169 values: vec![Value {
170 value_data: v.map(ValueData::I32Value),
171 }],
172 })
173 .collect(),
174 }),
175 partition_expr_version: version.map(|value| PartitionExprVersion { value }),
176 }
177 }
178}