common_meta/ddl/create_logical_tables/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
18use common_telemetry::debug;
19use common_telemetry::tracing_context::TracingContext;
20use store_api::storage::RegionId;
21
22use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
23use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
24use crate::ddl::utils::region_storage_path;
25use crate::error::Result;
26use crate::peer::Peer;
27use crate::rpc::ddl::CreateTableTask;
28use crate::rpc::router::{find_leader_regions, RegionRoute};
29
30impl CreateLogicalTablesProcedure {
31    pub(crate) fn make_request(
32        &self,
33        peer: &Peer,
34        region_routes: &[RegionRoute],
35    ) -> Result<Option<RegionRequest>> {
36        let tasks = &self.data.tasks;
37        let table_ids_already_exists = &self.data.table_ids_already_exists;
38        let regions_on_this_peer = find_leader_regions(region_routes, peer);
39        let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
40        for (task, table_id_already_exists) in tasks.iter().zip(table_ids_already_exists) {
41            if table_id_already_exists.is_some() {
42                continue;
43            }
44            let create_table_expr = &task.create_table;
45            let catalog = &create_table_expr.catalog_name;
46            let schema = &create_table_expr.schema_name;
47            let logical_table_id = task.table_info.ident.table_id;
48            let storage_path = region_storage_path(catalog, schema);
49            let request_builder = self.create_region_request_builder(task)?;
50
51            for region_number in &regions_on_this_peer {
52                let region_id = RegionId::new(logical_table_id, *region_number);
53                let one_region_request =
54                    request_builder.build_one(region_id, storage_path.clone(), &HashMap::new())?;
55                requests.push(one_region_request);
56            }
57        }
58
59        if requests.is_empty() {
60            debug!("no region request to send to datanodes");
61            return Ok(None);
62        }
63
64        Ok(Some(RegionRequest {
65            header: Some(RegionRequestHeader {
66                tracing_context: TracingContext::from_current_span().to_w3c(),
67                ..Default::default()
68            }),
69            body: Some(region_request::Body::Creates(CreateRequests { requests })),
70        }))
71    }
72
73    fn create_region_request_builder(
74        &self,
75        task: &CreateTableTask,
76    ) -> Result<CreateRequestBuilder> {
77        let create_expr = &task.create_table;
78        let template = build_template(create_expr)?;
79        Ok(CreateRequestBuilder::new(
80            template,
81            Some(self.data.physical_table_id),
82        ))
83    }
84}