operator/req_convert/delete/
table_to_region.rs1use api::v1::Rows;
16use api::v1::region::DeleteRequests as RegionDeleteRequests;
17use partition::manager::PartitionRuleManager;
18use table::metadata::TableInfo;
19use table::requests::DeleteRequest as TableDeleteRequest;
20
21use crate::error::Result;
22use crate::req_convert::common::partitioner::Partitioner;
23use crate::req_convert::common::{column_schema, row_count};
24
25pub struct TableToRegion<'a> {
26 table_info: &'a TableInfo,
27 partition_manager: &'a PartitionRuleManager,
28}
29
30impl<'a> TableToRegion<'a> {
31 pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self {
32 Self {
33 table_info,
34 partition_manager,
35 }
36 }
37
38 pub async fn convert(&self, request: TableDeleteRequest) -> Result<RegionDeleteRequests> {
39 let row_count = row_count(&request.key_column_values)?;
40 let schema = column_schema(self.table_info, &request.key_column_values)?;
41 let rows = api::helper::vectors_to_rows(request.key_column_values.values(), row_count);
42 let rows = Rows { schema, rows };
43
44 let requests = Partitioner::new(self.partition_manager)
45 .partition_delete_requests(self.table_info, rows)
46 .await?;
47 Ok(RegionDeleteRequests { requests })
48 }
49}
50
51#[cfg(test)]
52mod tests {
53 use std::collections::HashMap;
54 use std::sync::Arc;
55
56 use api::v1::helper::tag_column_schema;
57 use api::v1::region::DeleteRequest as RegionDeleteRequest;
58 use api::v1::value::ValueData;
59 use api::v1::{ColumnDataType, PartitionExprVersion, Row, Value};
60 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
61 use datatypes::vectors::{Int32Vector, VectorRef};
62 use store_api::storage::RegionId;
63
64 use super::*;
65 use crate::tests::{
66 create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
67 };
68
69 #[tokio::test]
70 async fn test_delete_request_table_to_region() {
71 let backend = prepare_mocked_backend().await;
82 let partition_manager = create_partition_rule_manager(backend.clone()).await;
83 let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter());
84
85 let converter = TableToRegion::new(&table_info, &partition_manager);
86
87 let table_request = build_table_request(Arc::new(Int32Vector::from(vec![
88 Some(1),
89 None,
90 Some(11),
91 Some(101),
92 ])));
93 let versions = partition_manager
94 .find_physical_partition_info(1)
95 .await
96 .unwrap()
97 .partitions
98 .iter()
99 .map(|p| (p.id.as_u64(), p.partition_expr_version))
100 .collect::<HashMap<_, _>>();
101
102 let region_requests = converter.convert(table_request).await.unwrap();
103 let mut region_id_to_region_requests = region_requests
104 .requests
105 .into_iter()
106 .map(|r| (r.region_id, r))
107 .collect::<HashMap<_, _>>();
108
109 let region_id = RegionId::new(1, 1).as_u64();
110 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
111 assert_eq!(
112 region_request,
113 build_region_request(vec![Some(101)], region_id, versions[®ion_id])
114 );
115
116 let region_id = RegionId::new(1, 2).as_u64();
117 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
118 assert_eq!(
119 region_request,
120 build_region_request(vec![Some(11)], region_id, versions[®ion_id])
121 );
122
123 let region_id = RegionId::new(1, 3).as_u64();
124 let region_request = region_id_to_region_requests.remove(®ion_id).unwrap();
125 assert_eq!(
126 region_request,
127 build_region_request(vec![Some(1), None], region_id, versions[®ion_id])
128 );
129 }
130
131 fn build_table_request(vector: VectorRef) -> TableDeleteRequest {
132 TableDeleteRequest {
133 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
134 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
135 table_name: "table_1".to_string(),
136 key_column_values: HashMap::from([("a".to_string(), vector)]),
137 }
138 }
139
140 fn build_region_request(
141 rows: Vec<Option<i32>>,
142 region_id: u64,
143 version: Option<u64>,
144 ) -> RegionDeleteRequest {
145 RegionDeleteRequest {
146 region_id,
147 rows: Some(Rows {
148 schema: vec![tag_column_schema("a", ColumnDataType::Int32)],
149 rows: rows
150 .into_iter()
151 .map(|v| Row {
152 values: vec![Value {
153 value_data: v.map(ValueData::I32Value),
154 }],
155 })
156 .collect(),
157 }),
158 partition_expr_version: version.map(|value| PartitionExprVersion { value }),
159 }
160 }
161}