cli/metadata/repair/
create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use client::api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader};
18use client::api::v1::CreateTableExpr;
19use common_meta::ddl::create_logical_tables::create_region_request_builder;
20use common_meta::ddl::utils::region_storage_path;
21use common_meta::peer::Peer;
22use common_meta::rpc::router::{find_leader_regions, RegionRoute};
23use operator::expr_helper::column_schemas_to_defs;
24use snafu::ResultExt;
25use store_api::storage::{RegionId, TableId};
26use table::metadata::RawTableInfo;
27
28use crate::error::{CovertColumnSchemasToDefsSnafu, Result};
29
30/// Generates a `CreateTableExpr` from a `RawTableInfo`.
31pub fn generate_create_table_expr(table_info: &RawTableInfo) -> Result<CreateTableExpr> {
32    let schema = &table_info.meta.schema;
33    let primary_keys = table_info
34        .meta
35        .primary_key_indices
36        .iter()
37        .map(|i| schema.column_schemas[*i].name.clone())
38        .collect::<Vec<_>>();
39
40    let timestamp_index = schema.timestamp_index.as_ref().unwrap();
41    let time_index = schema.column_schemas[*timestamp_index].name.clone();
42    let column_defs = column_schemas_to_defs(schema.column_schemas.clone(), &primary_keys)
43        .context(CovertColumnSchemasToDefsSnafu)?;
44    let table_options = HashMap::from(&table_info.meta.options);
45
46    Ok(CreateTableExpr {
47        catalog_name: table_info.catalog_name.to_string(),
48        schema_name: table_info.schema_name.to_string(),
49        table_name: table_info.name.to_string(),
50        desc: String::default(),
51        column_defs,
52        time_index,
53        primary_keys,
54        create_if_not_exists: true,
55        table_options,
56        table_id: None,
57        engine: table_info.meta.engine.to_string(),
58    })
59}
60
61/// Makes a create region request for a peer.
62pub fn make_create_region_request_for_peer(
63    logical_table_id: TableId,
64    physical_table_id: TableId,
65    create_table_expr: &CreateTableExpr,
66    peer: &Peer,
67    region_routes: &[RegionRoute],
68) -> Result<RegionRequest> {
69    let regions_on_this_peer = find_leader_regions(region_routes, peer);
70    let mut requests = Vec::with_capacity(regions_on_this_peer.len());
71    let request_builder =
72        create_region_request_builder(create_table_expr, physical_table_id).unwrap();
73
74    let catalog = &create_table_expr.catalog_name;
75    let schema = &create_table_expr.schema_name;
76    let storage_path = region_storage_path(catalog, schema);
77
78    for region_number in &regions_on_this_peer {
79        let region_id = RegionId::new(logical_table_id, *region_number);
80        let region_request =
81            request_builder.build_one(region_id, storage_path.clone(), &HashMap::new());
82        requests.push(region_request);
83    }
84
85    Ok(RegionRequest {
86        header: Some(RegionRequestHeader::default()),
87        body: Some(region_request::Body::Creates(CreateRequests { requests })),
88    })
89}