common_meta/ddl/create_logical_tables/
region_request.rs1use std::collections::HashMap;
16
17use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
18use api::v1::CreateTableExpr;
19use common_telemetry::debug;
20use common_telemetry::tracing_context::TracingContext;
21use store_api::storage::{RegionId, TableId};
22use table::metadata::RawTableInfo;
23
24use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
25use crate::ddl::create_table_template::{
26 build_template, build_template_from_raw_table_info, CreateRequestBuilder,
27};
28use crate::ddl::utils::region_storage_path;
29use crate::error::Result;
30use crate::peer::Peer;
31use crate::rpc::router::{find_leader_regions, RegionRoute};
32
33impl CreateLogicalTablesProcedure {
34 pub(crate) fn make_request(
35 &self,
36 peer: &Peer,
37 region_routes: &[RegionRoute],
38 ) -> Result<Option<RegionRequest>> {
39 let tasks = &self.data.tasks;
40 let table_ids_already_exists = &self.data.table_ids_already_exists;
41 let regions_on_this_peer = find_leader_regions(region_routes, peer);
42 let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len());
43 let partition_exprs = region_routes
44 .iter()
45 .map(|r| (r.region.id.region_number(), r.region.partition_expr()))
46 .collect();
47 for (task, table_id_already_exists) in tasks.iter().zip(table_ids_already_exists) {
48 if table_id_already_exists.is_some() {
49 continue;
50 }
51 let create_table_expr = &task.create_table;
52 let catalog = &create_table_expr.catalog_name;
53 let schema = &create_table_expr.schema_name;
54 let logical_table_id = task.table_info.ident.table_id;
55 let physical_table_id = self.data.physical_table_id;
56 let storage_path = region_storage_path(catalog, schema);
57 let request_builder = create_region_request_builder_from_raw_table_info(
58 &task.table_info,
59 physical_table_id,
60 )?;
61
62 for region_number in ®ions_on_this_peer {
63 let region_id = RegionId::new(logical_table_id, *region_number);
64 let one_region_request = request_builder.build_one(
65 region_id,
66 storage_path.clone(),
67 &HashMap::new(),
68 &partition_exprs,
69 );
70 requests.push(one_region_request);
71 }
72 }
73
74 if requests.is_empty() {
75 debug!("no region request to send to datanodes");
76 return Ok(None);
77 }
78
79 Ok(Some(RegionRequest {
80 header: Some(RegionRequestHeader {
81 tracing_context: TracingContext::from_current_span().to_w3c(),
82 ..Default::default()
83 }),
84 body: Some(region_request::Body::Creates(CreateRequests { requests })),
85 }))
86 }
87}
88
89pub fn create_region_request_builder(
91 create_table_expr: &CreateTableExpr,
92 physical_table_id: TableId,
93) -> Result<CreateRequestBuilder> {
94 let template = build_template(create_table_expr)?;
95 Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
96}
97
98pub fn create_region_request_builder_from_raw_table_info(
102 raw_table_info: &RawTableInfo,
103 physical_table_id: TableId,
104) -> Result<CreateRequestBuilder> {
105 let template = build_template_from_raw_table_info(raw_table_info)?;
106 Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
107}