operator/req_convert/delete/
table_to_region.rs1use api::v1::region::DeleteRequests as RegionDeleteRequests;
16use api::v1::Rows;
17use partition::manager::PartitionRuleManager;
18use table::metadata::TableInfo;
19use table::requests::DeleteRequest as TableDeleteRequest;
20
21use crate::error::Result;
22use crate::req_convert::common::partitioner::Partitioner;
23use crate::req_convert::common::{column_schema, row_count};
24
25pub struct TableToRegion<'a> {
26 table_info: &'a TableInfo,
27 partition_manager: &'a PartitionRuleManager,
28}
29
30impl<'a> TableToRegion<'a> {
31 pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self {
32 Self {
33 table_info,
34 partition_manager,
35 }
36 }
37
38 pub async fn convert(&self, request: TableDeleteRequest) -> Result<RegionDeleteRequests> {
39 let row_count = row_count(&request.key_column_values)?;
40 let schema = column_schema(self.table_info, &request.key_column_values)?;
41 let rows = api::helper::vectors_to_rows(request.key_column_values.values(), row_count);
42 let rows = Rows { schema, rows };
43
44 let requests = Partitioner::new(self.partition_manager)
45 .partition_delete_requests(self.table_info.table_id(), rows)
46 .await?;
47 Ok(RegionDeleteRequests { requests })
48 }
49}
50
51#[cfg(test)]
52mod tests {
53 use std::collections::HashMap;
54 use std::sync::Arc;
55
56 use api::v1::region::DeleteRequest as RegionDeleteRequest;
57 use api::v1::value::ValueData;
58 use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
59 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
60 use datatypes::vectors::{Int32Vector, VectorRef};
61 use store_api::storage::RegionId;
62
63 use super::*;
64 use crate::tests::{
65 create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
66 };
67
68 #[tokio::test]
69 async fn test_delete_request_table_to_region() {
70 let backend = prepare_mocked_backend().await;
81 let partition_manager = create_partition_rule_manager(backend.clone()).await;
82 let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter());
83
84 let converter = TableToRegion::new(&table_info, &partition_manager);
85
86 let table_request = build_table_request(Arc::new(Int32Vector::from(vec![
87 Some(1),
88 None,
89 Some(11),
90 Some(101),
91 ])));
92
93 let region_requests = converter.convert(table_request).await.unwrap();
94 let mut region_id_to_region_requests = region_requests
95 .requests
96 .into_iter()
97 .map(|r| (r.region_id, r))
98 .collect::<HashMap<_, _>>();
99
100 let region_id = RegionId::new(1, 1).as_u64();
101 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
102 assert_eq!(
103 region_request,
104 build_region_request(vec![Some(101)], region_id)
105 );
106
107 let region_id = RegionId::new(1, 2).as_u64();
108 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
109 assert_eq!(
110 region_request,
111 build_region_request(vec![Some(11)], region_id)
112 );
113
114 let region_id = RegionId::new(1, 3).as_u64();
115 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
116 assert_eq!(
117 region_request,
118 build_region_request(vec![Some(1), None], region_id)
119 );
120 }
121
122 fn build_table_request(vector: VectorRef) -> TableDeleteRequest {
123 TableDeleteRequest {
124 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
125 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
126 table_name: "table_1".to_string(),
127 key_column_values: HashMap::from([("a".to_string(), vector)]),
128 }
129 }
130
131 fn build_region_request(rows: Vec<Option<i32>>, region_id: u64) -> RegionDeleteRequest {
132 RegionDeleteRequest {
133 region_id,
134 rows: Some(Rows {
135 schema: vec![ColumnSchema {
136 column_name: "a".to_string(),
137 datatype: ColumnDataType::Int32 as i32,
138 semantic_type: SemanticType::Tag as i32,
139 ..Default::default()
140 }],
141 rows: rows
142 .into_iter()
143 .map(|v| Row {
144 values: vec![Value {
145 value_data: v.map(ValueData::I32Value),
146 }],
147 })
148 .collect(),
149 }),
150 }
151 }
152}