operator/req_convert/delete/
table_to_region.rs1use api::v1::region::DeleteRequests as RegionDeleteRequests;
16use api::v1::Rows;
17use partition::manager::PartitionRuleManager;
18use table::metadata::TableInfo;
19use table::requests::DeleteRequest as TableDeleteRequest;
20
21use crate::error::Result;
22use crate::req_convert::common::partitioner::Partitioner;
23use crate::req_convert::common::{column_schema, row_count};
24
25pub struct TableToRegion<'a> {
26 table_info: &'a TableInfo,
27 partition_manager: &'a PartitionRuleManager,
28}
29
30impl<'a> TableToRegion<'a> {
31 pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self {
32 Self {
33 table_info,
34 partition_manager,
35 }
36 }
37
38 pub async fn convert(&self, request: TableDeleteRequest) -> Result<RegionDeleteRequests> {
39 let row_count = row_count(&request.key_column_values)?;
40 let schema = column_schema(self.table_info, &request.key_column_values)?;
41 let rows = api::helper::vectors_to_rows(request.key_column_values.values(), row_count);
42 let rows = Rows { schema, rows };
43
44 let requests = Partitioner::new(self.partition_manager)
45 .partition_delete_requests(self.table_info.table_id(), rows)
46 .await?;
47 Ok(RegionDeleteRequests { requests })
48 }
49}
50
51#[cfg(test)]
52mod tests {
53 use std::collections::HashMap;
54 use std::sync::Arc;
55
56 use api::v1::region::DeleteRequest as RegionDeleteRequest;
57 use api::v1::value::ValueData;
58 use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
59 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
60 use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
61 use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
62 use common_meta::kv_backend::memory::MemoryKvBackend;
63 use common_meta::kv_backend::KvBackendRef;
64 use datatypes::vectors::{Int32Vector, VectorRef};
65 use store_api::storage::RegionId;
66
67 use super::*;
68 use crate::tests::{create_partition_rule_manager, new_test_table_info};
69
70 async fn prepare_mocked_backend() -> KvBackendRef {
71 let backend = Arc::new(MemoryKvBackend::default());
72
73 let catalog_manager = CatalogManager::new(backend.clone());
74 let schema_manager = SchemaManager::new(backend.clone());
75
76 catalog_manager
77 .create(CatalogNameKey::default(), false)
78 .await
79 .unwrap();
80 schema_manager
81 .create(SchemaNameKey::default(), None, false)
82 .await
83 .unwrap();
84
85 backend
86 }
87
88 #[tokio::test]
89 async fn test_delete_request_table_to_region() {
90 let backend = prepare_mocked_backend().await;
101 let partition_manager = create_partition_rule_manager(backend.clone()).await;
102 let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter());
103
104 let converter = TableToRegion::new(&table_info, &partition_manager);
105
106 let table_request = build_table_request(Arc::new(Int32Vector::from(vec![
107 Some(1),
108 None,
109 Some(11),
110 Some(101),
111 ])));
112
113 let region_requests = converter.convert(table_request).await.unwrap();
114 let mut region_id_to_region_requests = region_requests
115 .requests
116 .into_iter()
117 .map(|r| (r.region_id, r))
118 .collect::<HashMap<_, _>>();
119
120 let region_id = RegionId::new(1, 1).as_u64();
121 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
122 assert_eq!(
123 region_request,
124 build_region_request(vec![Some(101)], region_id)
125 );
126
127 let region_id = RegionId::new(1, 2).as_u64();
128 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
129 assert_eq!(
130 region_request,
131 build_region_request(vec![Some(11)], region_id)
132 );
133
134 let region_id = RegionId::new(1, 3).as_u64();
135 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
136 assert_eq!(
137 region_request,
138 build_region_request(vec![Some(1), None], region_id)
139 );
140 }
141
142 fn build_table_request(vector: VectorRef) -> TableDeleteRequest {
143 TableDeleteRequest {
144 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
145 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
146 table_name: "table_1".to_string(),
147 key_column_values: HashMap::from([("a".to_string(), vector)]),
148 }
149 }
150
151 fn build_region_request(rows: Vec<Option<i32>>, region_id: u64) -> RegionDeleteRequest {
152 RegionDeleteRequest {
153 region_id,
154 rows: Some(Rows {
155 schema: vec![ColumnSchema {
156 column_name: "a".to_string(),
157 datatype: ColumnDataType::Int32 as i32,
158 semantic_type: SemanticType::Tag as i32,
159 ..Default::default()
160 }],
161 rows: rows
162 .into_iter()
163 .map(|v| Row {
164 values: vec![Value {
165 value_data: v.map(ValueData::I32Value),
166 }],
167 })
168 .collect(),
169 }),
170 }
171 }
172}