Skip to main content

common_meta/ddl/
test_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod alter_table;
16pub mod columns;
17pub mod create_table;
18pub mod datanode_handler;
19pub mod flownode_handler;
20pub mod region_metadata;
21
22use std::assert_matches;
23use std::collections::HashMap;
24
25use api::v1::meta::Partition;
26use api::v1::{ColumnDataType, SemanticType};
27use common_procedure::Status;
28use datatypes::prelude::ConcreteDataType;
29use datatypes::schema::ColumnSchema;
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::{
32    DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
33    METRIC_ENGINE_NAME,
34};
35use store_api::storage::consts::ReservedColumnId;
36use table::metadata::{TableId, TableInfo};
37
38use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
39use crate::ddl::test_util::columns::TestColumnDefBuilder;
40use crate::ddl::test_util::create_table::{
41    TestCreateTableExprBuilder, build_raw_table_info_from_expr,
42};
43use crate::ddl::{DdlContext, TableMetadata};
44use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
45use crate::key::table_route::TableRouteValue;
46use crate::key::{MetadataKey, MetadataValue};
47use crate::peer::Peer;
48use crate::rpc::ddl::CreateTableTask;
49use crate::rpc::store::PutRequest;
50
51pub async fn create_physical_table_metadata(
52    ddl_context: &DdlContext,
53    table_info: TableInfo,
54    table_route: TableRouteValue,
55) {
56    ddl_context
57        .table_metadata_manager
58        .create_table_metadata(table_info, table_route, HashMap::default())
59        .await
60        .unwrap();
61}
62
63pub async fn put_datanode_address(ddl_context: &DdlContext, node_id: u64, addr: &str) {
64    ddl_context
65        .table_metadata_manager
66        .kv_backend()
67        .put(PutRequest {
68            key: NodeAddressKey::with_datanode(node_id).to_bytes(),
69            value: NodeAddressValue::new(Peer::new(node_id, addr))
70                .try_as_raw_value()
71                .unwrap(),
72            ..Default::default()
73        })
74        .await
75        .unwrap();
76}
77
78pub async fn create_physical_table(ddl_context: &DdlContext, name: &str) -> TableId {
79    // Prepares physical table metadata.
80    let mut create_physical_table_task = test_create_physical_table_task(name);
81    let TableMetadata {
82        table_id,
83        table_route,
84        ..
85    } = ddl_context
86        .table_metadata_allocator
87        .create(&create_physical_table_task)
88        .await
89        .unwrap();
90    create_physical_table_task.set_table_id(table_id);
91    create_physical_table_metadata(
92        ddl_context,
93        create_physical_table_task.table_info.clone(),
94        TableRouteValue::Physical(table_route),
95    )
96    .await;
97
98    table_id
99}
100
101pub async fn create_logical_table(
102    ddl_context: DdlContext,
103    physical_table_id: TableId,
104    table_name: &str,
105) -> TableId {
106    let tasks = vec![test_create_logical_table_task(table_name)];
107    let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
108    let status = procedure.on_prepare().await.unwrap();
109    assert_matches!(
110        status,
111        Status::Executing {
112            persist: true,
113            clean_poisons: false
114        }
115    );
116    let status = procedure.on_create_metadata().await.unwrap();
117    assert_matches!(status, Status::Done { .. });
118
119    let Status::Done {
120        output: Some(output),
121    } = status
122    else {
123        panic!("Unexpected status: {:?}", status);
124    };
125    output.downcast_ref::<Vec<u32>>().unwrap()[0]
126}
127
128pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
129    let create_table = TestCreateTableExprBuilder::default()
130        .column_defs([
131            TestColumnDefBuilder::default()
132                .name("ts")
133                .data_type(ColumnDataType::TimestampMillisecond)
134                .semantic_type(SemanticType::Timestamp)
135                .build()
136                .unwrap()
137                .into(),
138            TestColumnDefBuilder::default()
139                .name("host")
140                .data_type(ColumnDataType::String)
141                .semantic_type(SemanticType::Tag)
142                .build()
143                .unwrap()
144                .into(),
145            TestColumnDefBuilder::default()
146                .name("cpu")
147                .data_type(ColumnDataType::Float64)
148                .semantic_type(SemanticType::Field)
149                .build()
150                .unwrap()
151                .into(),
152        ])
153        .time_index("ts")
154        .primary_keys(["host".into()])
155        .table_name(name)
156        .engine(METRIC_ENGINE_NAME)
157        .table_options(HashMap::from([(
158            LOGICAL_TABLE_METADATA_KEY.to_string(),
159            "phy".to_string(),
160        )]))
161        .build()
162        .unwrap()
163        .into();
164    let table_info = build_raw_table_info_from_expr(&create_table);
165    CreateTableTask {
166        create_table,
167        // Single region
168        partitions: vec![Partition::default()],
169        table_info,
170    }
171}
172
173/// Creates a physical table task with a single region.
174pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
175    let create_table = TestCreateTableExprBuilder::default()
176        .column_defs([
177            TestColumnDefBuilder::default()
178                .name("ts")
179                .data_type(ColumnDataType::TimestampMillisecond)
180                .semantic_type(SemanticType::Timestamp)
181                .build()
182                .unwrap()
183                .into(),
184            TestColumnDefBuilder::default()
185                .name("value")
186                .data_type(ColumnDataType::Float64)
187                .semantic_type(SemanticType::Field)
188                .build()
189                .unwrap()
190                .into(),
191        ])
192        .time_index("ts")
193        .primary_keys(["value".into()])
194        .table_name(name)
195        .engine(METRIC_ENGINE_NAME)
196        .build()
197        .unwrap()
198        .into();
199    let table_info = build_raw_table_info_from_expr(&create_table);
200    CreateTableTask {
201        create_table,
202        // Single region
203        partitions: vec![Partition::default()],
204        table_info,
205    }
206}
207
208/// Creates a column metadata list with tag fields.
209pub fn test_column_metadatas(tag_fields: &[&str]) -> Vec<ColumnMetadata> {
210    let mut output = Vec::with_capacity(tag_fields.len() + 4);
211    output.extend([
212        ColumnMetadata {
213            column_schema: ColumnSchema::new(
214                "ts",
215                ConcreteDataType::timestamp_millisecond_datatype(),
216                false,
217            ),
218            semantic_type: SemanticType::Timestamp,
219            column_id: 0,
220        },
221        ColumnMetadata {
222            column_schema: ColumnSchema::new("value", ConcreteDataType::float64_datatype(), false),
223            semantic_type: SemanticType::Field,
224            column_id: 1,
225        },
226        ColumnMetadata {
227            column_schema: ColumnSchema::new(
228                DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
229                ConcreteDataType::timestamp_millisecond_datatype(),
230                false,
231            ),
232            semantic_type: SemanticType::Tag,
233            column_id: ReservedColumnId::table_id(),
234        },
235        ColumnMetadata {
236            column_schema: ColumnSchema::new(
237                DATA_SCHEMA_TSID_COLUMN_NAME,
238                ConcreteDataType::float64_datatype(),
239                false,
240            ),
241            semantic_type: SemanticType::Tag,
242            column_id: ReservedColumnId::tsid(),
243        },
244    ]);
245
246    for (i, name) in tag_fields.iter().enumerate() {
247        output.push(ColumnMetadata {
248            column_schema: ColumnSchema::new(
249                name.to_string(),
250                ConcreteDataType::string_datatype(),
251                true,
252            ),
253            semantic_type: SemanticType::Tag,
254            column_id: (i + 2) as u32,
255        });
256    }
257
258    output
259}
260
261/// Asserts the column names.
262pub fn assert_column_name(table_info: &TableInfo, expected_column_names: &[&str]) {
263    assert_eq!(
264        table_info
265            .meta
266            .schema
267            .column_schemas()
268            .iter()
269            .map(|c| c.name.clone())
270            .collect::<Vec<_>>(),
271        expected_column_names
272    );
273}
274
275/// Asserts the column metadatas
276pub fn assert_column_name_and_id(column_metadatas: &[ColumnMetadata], expected: &[(&str, u32)]) {
277    assert_eq!(expected.len(), column_metadatas.len());
278    for (name, id) in expected {
279        let column_metadata = column_metadatas
280            .iter()
281            .find(|c| c.column_id == *id)
282            .unwrap();
283        assert_eq!(column_metadata.column_schema.name, *name);
284    }
285}
286
287/// Gets the raw table info.
288pub async fn get_raw_table_info(ddl_context: &DdlContext, table_id: TableId) -> TableInfo {
289    ddl_context
290        .table_metadata_manager
291        .table_info_manager()
292        .get(table_id)
293        .await
294        .unwrap()
295        .unwrap()
296        .into_inner()
297        .table_info
298}