cli/metadata/repair/
create_table.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use client::api::v1::CreateTableExpr;
18use client::api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader, region_request};
19use common_meta::ddl::create_logical_tables::create_region_request_builder;
20use common_meta::ddl::utils::region_storage_path;
21use common_meta::peer::Peer;
22use common_meta::rpc::router::{RegionRoute, find_leader_regions};
23use operator::expr_helper::column_schemas_to_defs;
24use snafu::ResultExt;
25use store_api::storage::{RegionId, TableId};
26use table::metadata::TableInfo;
27
28use crate::error::{CovertColumnSchemasToDefsSnafu, Result};
29
30/// Generates a `CreateTableExpr` from a `TableInfo`.
31pub fn generate_create_table_expr(table_info: &TableInfo) -> Result<CreateTableExpr> {
32    let schema = &table_info.meta.schema;
33    let column_schemas = schema.column_schemas();
34    let primary_keys = table_info
35        .meta
36        .primary_key_indices
37        .iter()
38        .map(|i| column_schemas[*i].name.clone())
39        .collect::<Vec<_>>();
40
41    let timestamp_index = schema.timestamp_index().unwrap();
42    let time_index = column_schemas[timestamp_index].name.clone();
43    let column_defs = column_schemas_to_defs(column_schemas.to_vec(), &primary_keys)
44        .context(CovertColumnSchemasToDefsSnafu)?;
45    let table_options = HashMap::from(&table_info.meta.options);
46
47    Ok(CreateTableExpr {
48        catalog_name: table_info.catalog_name.clone(),
49        schema_name: table_info.schema_name.clone(),
50        table_name: table_info.name.clone(),
51        desc: String::default(),
52        column_defs,
53        time_index,
54        primary_keys,
55        create_if_not_exists: true,
56        table_options,
57        table_id: None,
58        engine: table_info.meta.engine.clone(),
59    })
60}
61
62/// Makes a create region request for a peer.
63pub fn make_create_region_request_for_peer(
64    logical_table_id: TableId,
65    physical_table_id: TableId,
66    create_table_expr: &CreateTableExpr,
67    peer: &Peer,
68    region_routes: &[RegionRoute],
69) -> Result<RegionRequest> {
70    let regions_on_this_peer = find_leader_regions(region_routes, peer);
71    let mut requests = Vec::with_capacity(regions_on_this_peer.len());
72    let request_builder =
73        create_region_request_builder(create_table_expr, physical_table_id).unwrap();
74
75    let catalog = &create_table_expr.catalog_name;
76    let schema = &create_table_expr.schema_name;
77    let storage_path = region_storage_path(catalog, schema);
78    let partition_exprs = region_routes
79        .iter()
80        .map(|r| (r.region.id.region_number(), r.region.partition_expr()))
81        .collect::<HashMap<_, _>>();
82
83    for region_number in &regions_on_this_peer {
84        let region_id = RegionId::new(logical_table_id, *region_number);
85        let region_request = request_builder.build_one(
86            region_id,
87            storage_path.clone(),
88            &HashMap::new(),
89            &partition_exprs,
90        );
91        requests.push(region_request);
92    }
93
94    Ok(RegionRequest {
95        header: Some(RegionRequestHeader::default()),
96        body: Some(region_request::Body::Creates(CreateRequests { requests })),
97    })
98}