operator/req_convert/delete/
table_to_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::region::DeleteRequests as RegionDeleteRequests;
16use api::v1::Rows;
17use partition::manager::PartitionRuleManager;
18use table::metadata::TableInfo;
19use table::requests::DeleteRequest as TableDeleteRequest;
20
21use crate::error::Result;
22use crate::req_convert::common::partitioner::Partitioner;
23use crate::req_convert::common::{column_schema, row_count};
24
25pub struct TableToRegion<'a> {
26    table_info: &'a TableInfo,
27    partition_manager: &'a PartitionRuleManager,
28}
29
30impl<'a> TableToRegion<'a> {
31    pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self {
32        Self {
33            table_info,
34            partition_manager,
35        }
36    }
37
38    pub async fn convert(&self, request: TableDeleteRequest) -> Result<RegionDeleteRequests> {
39        let row_count = row_count(&request.key_column_values)?;
40        let schema = column_schema(self.table_info, &request.key_column_values)?;
41        let rows = api::helper::vectors_to_rows(request.key_column_values.values(), row_count);
42        let rows = Rows { schema, rows };
43
44        let requests = Partitioner::new(self.partition_manager)
45            .partition_delete_requests(self.table_info.table_id(), rows)
46            .await?;
47        Ok(RegionDeleteRequests { requests })
48    }
49}
50
51#[cfg(test)]
52mod tests {
53    use std::collections::HashMap;
54    use std::sync::Arc;
55
56    use api::v1::region::DeleteRequest as RegionDeleteRequest;
57    use api::v1::value::ValueData;
58    use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
59    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
60    use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
61    use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
62    use common_meta::kv_backend::memory::MemoryKvBackend;
63    use common_meta::kv_backend::KvBackendRef;
64    use datatypes::vectors::{Int32Vector, VectorRef};
65    use store_api::storage::RegionId;
66
67    use super::*;
68    use crate::tests::{create_partition_rule_manager, new_test_table_info};
69
70    async fn prepare_mocked_backend() -> KvBackendRef {
71        let backend = Arc::new(MemoryKvBackend::default());
72
73        let catalog_manager = CatalogManager::new(backend.clone());
74        let schema_manager = SchemaManager::new(backend.clone());
75
76        catalog_manager
77            .create(CatalogNameKey::default(), false)
78            .await
79            .unwrap();
80        schema_manager
81            .create(SchemaNameKey::default(), None, false)
82            .await
83            .unwrap();
84
85        backend
86    }
87
88    #[tokio::test]
89    async fn test_delete_request_table_to_region() {
90        // region to datanode placement:
91        // 1 -> 1
92        // 2 -> 2
93        // 3 -> 3
94        //
95        // region value ranges:
96        // 1 -> [50, max)
97        // 2 -> [10, 50)
98        // 3 -> (min, 10)
99
100        let backend = prepare_mocked_backend().await;
101        let partition_manager = create_partition_rule_manager(backend.clone()).await;
102        let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter());
103
104        let converter = TableToRegion::new(&table_info, &partition_manager);
105
106        let table_request = build_table_request(Arc::new(Int32Vector::from(vec![
107            Some(1),
108            None,
109            Some(11),
110            Some(101),
111        ])));
112
113        let region_requests = converter.convert(table_request).await.unwrap();
114        let mut region_id_to_region_requests = region_requests
115            .requests
116            .into_iter()
117            .map(|r| (r.region_id, r))
118            .collect::<HashMap<_, _>>();
119
120        let region_id = RegionId::new(1, 1).as_u64();
121        let region_request = region_id_to_region_requests.remove(&region_id).unwrap();
122        assert_eq!(
123            region_request,
124            build_region_request(vec![Some(101)], region_id)
125        );
126
127        let region_id = RegionId::new(1, 2).as_u64();
128        let region_request = region_id_to_region_requests.remove(&region_id).unwrap();
129        assert_eq!(
130            region_request,
131            build_region_request(vec![Some(11)], region_id)
132        );
133
134        let region_id = RegionId::new(1, 3).as_u64();
135        let region_request = region_id_to_region_requests.remove(&region_id).unwrap();
136        assert_eq!(
137            region_request,
138            build_region_request(vec![Some(1), None], region_id)
139        );
140    }
141
142    fn build_table_request(vector: VectorRef) -> TableDeleteRequest {
143        TableDeleteRequest {
144            catalog_name: DEFAULT_CATALOG_NAME.to_string(),
145            schema_name: DEFAULT_SCHEMA_NAME.to_string(),
146            table_name: "table_1".to_string(),
147            key_column_values: HashMap::from([("a".to_string(), vector)]),
148        }
149    }
150
151    fn build_region_request(rows: Vec<Option<i32>>, region_id: u64) -> RegionDeleteRequest {
152        RegionDeleteRequest {
153            region_id,
154            rows: Some(Rows {
155                schema: vec![ColumnSchema {
156                    column_name: "a".to_string(),
157                    datatype: ColumnDataType::Int32 as i32,
158                    semantic_type: SemanticType::Tag as i32,
159                    ..Default::default()
160                }],
161                rows: rows
162                    .into_iter()
163                    .map(|v| Row {
164                        values: vec![Value {
165                            value_data: v.map(ValueData::I32Value),
166                        }],
167                    })
168                    .collect(),
169            }),
170        }
171    }
172}