operator/req_convert/insert/
table_to_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::region::InsertRequests as RegionInsertRequests;
16use api::v1::Rows;
17use partition::manager::PartitionRuleManager;
18use table::metadata::TableInfo;
19use table::requests::InsertRequest as TableInsertRequest;
20
21use crate::error::Result;
22use crate::insert::InstantAndNormalInsertRequests;
23use crate::req_convert::common::partitioner::Partitioner;
24use crate::req_convert::common::{column_schema, row_count};
25
26pub struct TableToRegion<'a> {
27    table_info: &'a TableInfo,
28    partition_manager: &'a PartitionRuleManager,
29}
30
31impl<'a> TableToRegion<'a> {
32    pub fn new(table_info: &'a TableInfo, partition_manager: &'a PartitionRuleManager) -> Self {
33        Self {
34            table_info,
35            partition_manager,
36        }
37    }
38
39    pub async fn convert(
40        &self,
41        request: TableInsertRequest,
42    ) -> Result<InstantAndNormalInsertRequests> {
43        let row_count = row_count(&request.columns_values)?;
44        let schema = column_schema(self.table_info, &request.columns_values)?;
45        let rows = api::helper::vectors_to_rows(request.columns_values.values(), row_count);
46
47        let rows = Rows { schema, rows };
48        let requests = Partitioner::new(self.partition_manager)
49            .partition_insert_requests(self.table_info.table_id(), rows)
50            .await?;
51
52        let requests = RegionInsertRequests { requests };
53        if self.table_info.is_ttl_instant_table() {
54            Ok(InstantAndNormalInsertRequests {
55                normal_requests: Default::default(),
56                instant_requests: requests,
57            })
58        } else {
59            Ok(InstantAndNormalInsertRequests {
60                normal_requests: requests,
61                instant_requests: Default::default(),
62            })
63        }
64    }
65}
66
67#[cfg(test)]
68mod tests {
69    use std::collections::HashMap;
70    use std::sync::Arc;
71
72    use api::v1::region::InsertRequest as RegionInsertRequest;
73    use api::v1::value::ValueData;
74    use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
75    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
76    use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
77    use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
78    use common_meta::kv_backend::memory::MemoryKvBackend;
79    use common_meta::kv_backend::KvBackendRef;
80    use datatypes::vectors::{Int32Vector, VectorRef};
81    use store_api::storage::RegionId;
82
83    use super::*;
84    use crate::tests::{create_partition_rule_manager, new_test_table_info};
85
86    async fn prepare_mocked_backend() -> KvBackendRef {
87        let backend = Arc::new(MemoryKvBackend::default());
88
89        let catalog_manager = CatalogManager::new(backend.clone());
90        let schema_manager = SchemaManager::new(backend.clone());
91
92        catalog_manager
93            .create(CatalogNameKey::default(), false)
94            .await
95            .unwrap();
96        schema_manager
97            .create(SchemaNameKey::default(), None, false)
98            .await
99            .unwrap();
100
101        backend
102    }
103
104    #[tokio::test]
105    async fn test_insert_request_table_to_region() {
106        // region to datanode placement:
107        // 1 -> 1
108        // 2 -> 2
109        // 3 -> 3
110        //
111        // region value ranges:
112        // 1 -> [50, max)
113        // 2 -> [10, 50)
114        // 3 -> (min, 10)
115
116        let backend = prepare_mocked_backend().await;
117        let partition_manager = create_partition_rule_manager(backend.clone()).await;
118        let table_info = new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter());
119
120        let converter = TableToRegion::new(&table_info, &partition_manager);
121
122        let table_request = build_table_request(Arc::new(Int32Vector::from(vec![
123            Some(1),
124            None,
125            Some(11),
126            Some(101),
127        ])));
128
129        let region_requests = converter.convert(table_request).await.unwrap();
130        let mut region_id_to_region_requests = region_requests
131            .normal_requests
132            .requests
133            .into_iter()
134            .map(|r| (r.region_id, r))
135            .collect::<HashMap<_, _>>();
136
137        let region_id = RegionId::new(1, 1).as_u64();
138        let region_request = region_id_to_region_requests.remove(&region_id).unwrap();
139        assert_eq!(
140            region_request,
141            build_region_request(vec![Some(101)], region_id)
142        );
143
144        let region_id = RegionId::new(1, 2).as_u64();
145        let region_request = region_id_to_region_requests.remove(&region_id).unwrap();
146        assert_eq!(
147            region_request,
148            build_region_request(vec![Some(11)], region_id)
149        );
150
151        let region_id = RegionId::new(1, 3).as_u64();
152        let region_request = region_id_to_region_requests.remove(&region_id).unwrap();
153        assert_eq!(
154            region_request,
155            build_region_request(vec![Some(1), None], region_id)
156        );
157    }
158
159    fn build_table_request(vector: VectorRef) -> TableInsertRequest {
160        TableInsertRequest {
161            catalog_name: DEFAULT_CATALOG_NAME.to_string(),
162            schema_name: DEFAULT_SCHEMA_NAME.to_string(),
163            table_name: "table_1".to_string(),
164            columns_values: HashMap::from([("a".to_string(), vector)]),
165        }
166    }
167
168    fn build_region_request(rows: Vec<Option<i32>>, region_id: u64) -> RegionInsertRequest {
169        RegionInsertRequest {
170            region_id,
171            rows: Some(Rows {
172                schema: vec![ColumnSchema {
173                    column_name: "a".to_string(),
174                    datatype: ColumnDataType::Int32 as i32,
175                    semantic_type: SemanticType::Tag as i32,
176                    ..Default::default()
177                }],
178                rows: rows
179                    .into_iter()
180                    .map(|v| Row {
181                        values: vec![Value {
182                            value_data: v.map(ValueData::I32Value),
183                        }],
184                    })
185                    .collect(),
186            }),
187        }
188    }
189}