common_meta/ddl/
test_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod alter_table;
16pub mod columns;
17pub mod create_table;
18pub mod datanode_handler;
19pub mod flownode_handler;
20
21use std::assert_matches::assert_matches;
22use std::collections::HashMap;
23
24use api::v1::meta::Partition;
25use api::v1::{ColumnDataType, SemanticType};
26use common_procedure::Status;
27use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
28use table::metadata::{RawTableInfo, TableId};
29
30use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
31use crate::ddl::test_util::columns::TestColumnDefBuilder;
32use crate::ddl::test_util::create_table::{
33    build_raw_table_info_from_expr, TestCreateTableExprBuilder,
34};
35use crate::ddl::{DdlContext, TableMetadata};
36use crate::key::table_route::TableRouteValue;
37use crate::rpc::ddl::CreateTableTask;
38
39pub async fn create_physical_table_metadata(
40    ddl_context: &DdlContext,
41    table_info: RawTableInfo,
42    table_route: TableRouteValue,
43) {
44    ddl_context
45        .table_metadata_manager
46        .create_table_metadata(table_info, table_route, HashMap::default())
47        .await
48        .unwrap();
49}
50
51pub async fn create_physical_table(ddl_context: &DdlContext, name: &str) -> TableId {
52    // Prepares physical table metadata.
53    let mut create_physical_table_task = test_create_physical_table_task(name);
54    let TableMetadata {
55        table_id,
56        table_route,
57        ..
58    } = ddl_context
59        .table_metadata_allocator
60        .create(&create_physical_table_task)
61        .await
62        .unwrap();
63    create_physical_table_task.set_table_id(table_id);
64    create_physical_table_metadata(
65        ddl_context,
66        create_physical_table_task.table_info.clone(),
67        TableRouteValue::Physical(table_route),
68    )
69    .await;
70
71    table_id
72}
73
74pub async fn create_logical_table(
75    ddl_context: DdlContext,
76    physical_table_id: TableId,
77    table_name: &str,
78) -> TableId {
79    let tasks = vec![test_create_logical_table_task(table_name)];
80    let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
81    let status = procedure.on_prepare().await.unwrap();
82    assert_matches!(
83        status,
84        Status::Executing {
85            persist: true,
86            clean_poisons: false
87        }
88    );
89    let status = procedure.on_create_metadata().await.unwrap();
90    assert_matches!(status, Status::Done { .. });
91
92    let Status::Done {
93        output: Some(output),
94    } = status
95    else {
96        panic!("Unexpected status: {:?}", status);
97    };
98    output.downcast_ref::<Vec<u32>>().unwrap()[0]
99}
100
101pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
102    let create_table = TestCreateTableExprBuilder::default()
103        .column_defs([
104            TestColumnDefBuilder::default()
105                .name("ts")
106                .data_type(ColumnDataType::TimestampMillisecond)
107                .semantic_type(SemanticType::Timestamp)
108                .build()
109                .unwrap()
110                .into(),
111            TestColumnDefBuilder::default()
112                .name("host")
113                .data_type(ColumnDataType::String)
114                .semantic_type(SemanticType::Tag)
115                .build()
116                .unwrap()
117                .into(),
118            TestColumnDefBuilder::default()
119                .name("cpu")
120                .data_type(ColumnDataType::Float64)
121                .semantic_type(SemanticType::Field)
122                .build()
123                .unwrap()
124                .into(),
125        ])
126        .time_index("ts")
127        .primary_keys(["host".into()])
128        .table_name(name)
129        .engine(METRIC_ENGINE_NAME)
130        .table_options(HashMap::from([(
131            LOGICAL_TABLE_METADATA_KEY.to_string(),
132            "phy".to_string(),
133        )]))
134        .build()
135        .unwrap()
136        .into();
137    let table_info = build_raw_table_info_from_expr(&create_table);
138    CreateTableTask {
139        create_table,
140        // Single region
141        partitions: vec![Partition {
142            column_list: vec![],
143            value_list: vec![],
144        }],
145        table_info,
146    }
147}
148
149pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
150    let create_table = TestCreateTableExprBuilder::default()
151        .column_defs([
152            TestColumnDefBuilder::default()
153                .name("ts")
154                .data_type(ColumnDataType::TimestampMillisecond)
155                .semantic_type(SemanticType::Timestamp)
156                .build()
157                .unwrap()
158                .into(),
159            TestColumnDefBuilder::default()
160                .name("value")
161                .data_type(ColumnDataType::Float64)
162                .semantic_type(SemanticType::Field)
163                .build()
164                .unwrap()
165                .into(),
166        ])
167        .time_index("ts")
168        .primary_keys(["value".into()])
169        .table_name(name)
170        .engine(METRIC_ENGINE_NAME)
171        .build()
172        .unwrap()
173        .into();
174    let table_info = build_raw_table_info_from_expr(&create_table);
175    CreateTableTask {
176        create_table,
177        // Single region
178        partitions: vec![Partition {
179            column_list: vec![],
180            value_list: vec![],
181        }],
182        table_info,
183    }
184}