cli/metadata/repair/
create_table.rs1use std::collections::HashMap;
16
17use client::api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
18use client::api::v1::CreateTableExpr;
19use common_meta::ddl::create_logical_tables::create_region_request_builder;
20use common_meta::ddl::utils::region_storage_path;
21use common_meta::peer::Peer;
22use common_meta::rpc::router::{find_leader_regions, RegionRoute};
23use operator::expr_helper::column_schemas_to_defs;
24use snafu::ResultExt;
25use store_api::storage::{RegionId, TableId};
26use table::metadata::RawTableInfo;
27
28use crate::error::{CovertColumnSchemasToDefsSnafu, Result};
29
30pub fn generate_create_table_expr(table_info: &RawTableInfo) -> Result<CreateTableExpr> {
32 let schema = &table_info.meta.schema;
33 let primary_keys = table_info
34 .meta
35 .primary_key_indices
36 .iter()
37 .map(|i| schema.column_schemas[*i].name.clone())
38 .collect::<Vec<_>>();
39
40 let timestamp_index = schema.timestamp_index.as_ref().unwrap();
41 let time_index = schema.column_schemas[*timestamp_index].name.clone();
42 let column_defs = column_schemas_to_defs(schema.column_schemas.clone(), &primary_keys)
43 .context(CovertColumnSchemasToDefsSnafu)?;
44 let table_options = HashMap::from(&table_info.meta.options);
45
46 Ok(CreateTableExpr {
47 catalog_name: table_info.catalog_name.to_string(),
48 schema_name: table_info.schema_name.to_string(),
49 table_name: table_info.name.to_string(),
50 desc: String::default(),
51 column_defs,
52 time_index,
53 primary_keys,
54 create_if_not_exists: true,
55 table_options,
56 table_id: None,
57 engine: table_info.meta.engine.to_string(),
58 })
59}
60
61pub fn make_create_region_request_for_peer(
63 logical_table_id: TableId,
64 physical_table_id: TableId,
65 create_table_expr: &CreateTableExpr,
66 peer: &Peer,
67 region_routes: &[RegionRoute],
68) -> Result<RegionRequest> {
69 let regions_on_this_peer = find_leader_regions(region_routes, peer);
70 let mut requests = Vec::with_capacity(regions_on_this_peer.len());
71 let request_builder =
72 create_region_request_builder(create_table_expr, physical_table_id).unwrap();
73
74 let catalog = &create_table_expr.catalog_name;
75 let schema = &create_table_expr.schema_name;
76 let storage_path = region_storage_path(catalog, schema);
77 let partition_exprs = region_routes
78 .iter()
79 .map(|r| (r.region.id.region_number(), r.region.partition_expr()))
80 .collect::<HashMap<_, _>>();
81
82 for region_number in ®ions_on_this_peer {
83 let region_id = RegionId::new(logical_table_id, *region_number);
84 let region_request = request_builder.build_one(
85 region_id,
86 storage_path.clone(),
87 &HashMap::new(),
88 &partition_exprs,
89 );
90 requests.push(region_request);
91 }
92
93 Ok(RegionRequest {
94 header: Some(RegionRequestHeader::default()),
95 body: Some(region_request::Body::Creates(CreateRequests { requests })),
96 })
97}