operator/req_convert/insert/
table_to_region.rs1use api::v1::region::InsertRequests as RegionInsertRequests;
16use api::v1::Rows;
17use partition::manager::PartitionRuleManager;
18use table::metadata::TableInfo;
19use table::requests::InsertRequest as TableInsertRequest;
20
21use crate::error::Result;
22use crate::insert::InstantAndNormalInsertRequests;
23use crate::req_convert::common::partitioner::Partitioner;
24use crate::req_convert::common::{column_schema, row_count};
25
26pub struct TableToRegion<'a> {
27 table_info: &'a TableInfo,
28 partition_manager: &'a PartitionRuleManager,
29}
30
31impl<'a> TableToRegion<'a> {
32 pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self {
33 Self {
34 table_info,
35 partition_manager,
36 }
37 }
38
39 pub async fn convert(
40 &self,
41 request: TableInsertRequest,
42 ) -> Result<InstantAndNormalInsertRequests> {
43 let row_count = row_count(&request.columns_values)?;
44 let schema = column_schema(self.table_info, &request.columns_values)?;
45 let rows = api::helper::vectors_to_rows(request.columns_values.values(), row_count);
46
47 let rows = Rows { schema, rows };
48 let requests = Partitioner::new(self.partition_manager)
49 .partition_insert_requests(self.table_info, rows)
50 .await?;
51
52 let requests = RegionInsertRequests { requests };
53 if self.table_info.is_ttl_instant_table() {
54 Ok(InstantAndNormalInsertRequests {
55 normal_requests: Default::default(),
56 instant_requests: requests,
57 })
58 } else {
59 Ok(InstantAndNormalInsertRequests {
60 normal_requests: requests,
61 instant_requests: Default::default(),
62 })
63 }
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 use std::collections::HashMap;
70 use std::sync::Arc;
71
72 use api::v1::helper::tag_column_schema;
73 use api::v1::region::InsertRequest as RegionInsertRequest;
74 use api::v1::value::ValueData;
75 use api::v1::{ColumnDataType, Row, Value};
76 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
77 use datatypes::vectors::{Int32Vector, VectorRef};
78 use store_api::storage::RegionId;
79
80 use super::*;
81 use crate::tests::{
82 create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
83 };
84
85 #[tokio::test]
86 async fn test_insert_request_table_to_region() {
87 let backend = prepare_mocked_backend().await;
98 let partition_manager = create_partition_rule_manager(backend.clone()).await;
99 let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter());
100
101 let converter = TableToRegion::new(&table_info, &partition_manager);
102
103 let table_request = build_table_request(Arc::new(Int32Vector::from(vec![
104 Some(1),
105 None,
106 Some(11),
107 Some(101),
108 ])));
109
110 let region_requests = converter.convert(table_request).await.unwrap();
111 let mut region_id_to_region_requests = region_requests
112 .normal_requests
113 .requests
114 .into_iter()
115 .map(|r| (r.region_id, r))
116 .collect::<HashMap<_, _>>();
117
118 let region_id = RegionId::new(1, 1).as_u64();
119 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
120 assert_eq!(
121 region_request,
122 build_region_request(vec![Some(101)], region_id)
123 );
124
125 let region_id = RegionId::new(1, 2).as_u64();
126 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
127 assert_eq!(
128 region_request,
129 build_region_request(vec![Some(11)], region_id)
130 );
131
132 let region_id = RegionId::new(1, 3).as_u64();
133 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
134 assert_eq!(
135 region_request,
136 build_region_request(vec![Some(1), None], region_id)
137 );
138 }
139
140 fn build_table_request(vector: VectorRef) -> TableInsertRequest {
141 TableInsertRequest {
142 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
143 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
144 table_name: "table_1".to_string(),
145 columns_values: HashMap::from([("a".to_string(), vector)]),
146 }
147 }
148
149 fn build_region_request(rows: Vec<Option<i32>>, region_id: u64) -> RegionInsertRequest {
150 RegionInsertRequest {
151 region_id,
152 rows: Some(Rows {
153 schema: vec![tag_column_schema("a", ColumnDataType::Int32)],
154 rows: rows
155 .into_iter()
156 .map(|v| Row {
157 values: vec![Value {
158 value_data: v.map(ValueData::I32Value),
159 }],
160 })
161 .collect(),
162 }),
163 }
164 }
165}