common_meta/ddl/create_logical_tables/
region_request.rs1use std::collections::HashMap;
16
17use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
18use api::v1::CreateTableExpr;
19use common_telemetry::debug;
20use common_telemetry::tracing_context::TracingContext;
21use store_api::storage::{RegionId, TableId};
22
23use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
24use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
25use crate::ddl::utils::region_storage_path;
26use crate::error::Result;
27use crate::peer::Peer;
28use crate::rpc::router::{find_leader_regions, RegionRoute};
29
30impl CreateLogicalTablesProcedure {
31 pub(crate) fn make_request(
32 &self,
33 peer: &Peer,
34 region_routes: &[RegionRoute],
35 ) -> Result<Option<RegionRequest>> {
36 let tasks = &self.data.tasks;
37 let table_ids_already_exists = &self.data.table_ids_already_exists;
38 let regions_on_this_peer = find_leader_regions(region_routes, peer);
39 let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
40 for (task, table_id_already_exists) in tasks.iter().zip(table_ids_already_exists) {
41 if table_id_already_exists.is_some() {
42 continue;
43 }
44 let create_table_expr = &task.create_table;
45 let catalog = &create_table_expr.catalog_name;
46 let schema = &create_table_expr.schema_name;
47 let logical_table_id = task.table_info.ident.table_id;
48 let physical_table_id = self.data.physical_table_id;
49 let storage_path = region_storage_path(catalog, schema);
50 let request_builder =
51 create_region_request_builder(&task.create_table, physical_table_id)?;
52
53 for region_number in ®ions_on_this_peer {
54 let region_id = RegionId::new(logical_table_id, *region_number);
55 let one_region_request =
56 request_builder.build_one(region_id, storage_path.clone(), &HashMap::new());
57 requests.push(one_region_request);
58 }
59 }
60
61 if requests.is_empty() {
62 debug!("no region request to send to datanodes");
63 return Ok(None);
64 }
65
66 Ok(Some(RegionRequest {
67 header: Some(RegionRequestHeader {
68 tracing_context: TracingContext::from_current_span().to_w3c(),
69 ..Default::default()
70 }),
71 body: Some(region_request::Body::Creates(CreateRequests { requests })),
72 }))
73 }
74}
75
76pub fn create_region_request_builder(
78 create_table_expr: &CreateTableExpr,
79 physical_table_id: TableId,
80) -> Result<CreateRequestBuilder> {
81 let template = build_template(create_table_expr)?;
82 Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
83}