common_meta/ddl/
test_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod alter_table;
16pub mod columns;
17pub mod create_table;
18pub mod datanode_handler;
19pub mod flownode_handler;
20
21use std::collections::HashMap;
22
23use api::v1::meta::Partition;
24use api::v1::{ColumnDataType, SemanticType};
25use common_procedure::Status;
26use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
27use table::metadata::{RawTableInfo, TableId};
28
29use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
30use crate::ddl::test_util::columns::TestColumnDefBuilder;
31use crate::ddl::test_util::create_table::{
32    build_raw_table_info_from_expr, TestCreateTableExprBuilder,
33};
34use crate::ddl::{DdlContext, TableMetadata};
35use crate::key::table_route::TableRouteValue;
36use crate::rpc::ddl::CreateTableTask;
37
38pub async fn create_physical_table_metadata(
39    ddl_context: &DdlContext,
40    table_info: RawTableInfo,
41    table_route: TableRouteValue,
42) {
43    ddl_context
44        .table_metadata_manager
45        .create_table_metadata(table_info, table_route, HashMap::default())
46        .await
47        .unwrap();
48}
49
50pub async fn create_physical_table(ddl_context: &DdlContext, name: &str) -> TableId {
51    // Prepares physical table metadata.
52    let mut create_physical_table_task = test_create_physical_table_task(name);
53    let TableMetadata {
54        table_id,
55        table_route,
56        ..
57    } = ddl_context
58        .table_metadata_allocator
59        .create(&create_physical_table_task)
60        .await
61        .unwrap();
62    create_physical_table_task.set_table_id(table_id);
63    create_physical_table_metadata(
64        ddl_context,
65        create_physical_table_task.table_info.clone(),
66        TableRouteValue::Physical(table_route),
67    )
68    .await;
69
70    table_id
71}
72
73pub async fn create_logical_table(
74    ddl_context: DdlContext,
75    physical_table_id: TableId,
76    table_name: &str,
77) -> TableId {
78    use std::assert_matches::assert_matches;
79
80    let tasks = vec![test_create_logical_table_task(table_name)];
81    let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
82    let status = procedure.on_prepare().await.unwrap();
83    assert_matches!(
84        status,
85        Status::Executing {
86            persist: true,
87            clean_poisons: false
88        }
89    );
90    let status = procedure.on_create_metadata().await.unwrap();
91    assert_matches!(status, Status::Done { .. });
92
93    let Status::Done {
94        output: Some(output),
95    } = status
96    else {
97        panic!("Unexpected status: {:?}", status);
98    };
99    output.downcast_ref::<Vec<u32>>().unwrap()[0]
100}
101
102pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
103    let create_table = TestCreateTableExprBuilder::default()
104        .column_defs([
105            TestColumnDefBuilder::default()
106                .name("ts")
107                .data_type(ColumnDataType::TimestampMillisecond)
108                .semantic_type(SemanticType::Timestamp)
109                .build()
110                .unwrap()
111                .into(),
112            TestColumnDefBuilder::default()
113                .name("host")
114                .data_type(ColumnDataType::String)
115                .semantic_type(SemanticType::Tag)
116                .build()
117                .unwrap()
118                .into(),
119            TestColumnDefBuilder::default()
120                .name("cpu")
121                .data_type(ColumnDataType::Float64)
122                .semantic_type(SemanticType::Field)
123                .build()
124                .unwrap()
125                .into(),
126        ])
127        .time_index("ts")
128        .primary_keys(["host".into()])
129        .table_name(name)
130        .engine(METRIC_ENGINE_NAME)
131        .table_options(HashMap::from([(
132            LOGICAL_TABLE_METADATA_KEY.to_string(),
133            "phy".to_string(),
134        )]))
135        .build()
136        .unwrap()
137        .into();
138    let table_info = build_raw_table_info_from_expr(&create_table);
139    CreateTableTask {
140        create_table,
141        // Single region
142        partitions: vec![Partition {
143            column_list: vec![],
144            value_list: vec![],
145        }],
146        table_info,
147    }
148}
149
150pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
151    let create_table = TestCreateTableExprBuilder::default()
152        .column_defs([
153            TestColumnDefBuilder::default()
154                .name("ts")
155                .data_type(ColumnDataType::TimestampMillisecond)
156                .semantic_type(SemanticType::Timestamp)
157                .build()
158                .unwrap()
159                .into(),
160            TestColumnDefBuilder::default()
161                .name("value")
162                .data_type(ColumnDataType::Float64)
163                .semantic_type(SemanticType::Field)
164                .build()
165                .unwrap()
166                .into(),
167        ])
168        .time_index("ts")
169        .primary_keys(["value".into()])
170        .table_name(name)
171        .engine(METRIC_ENGINE_NAME)
172        .build()
173        .unwrap()
174        .into();
175    let table_info = build_raw_table_info_from_expr(&create_table);
176    CreateTableTask {
177        create_table,
178        // Single region
179        partitions: vec![Partition {
180            column_list: vec![],
181            value_list: vec![],
182        }],
183        table_info,
184    }
185}