common_meta/ddl/
test_util.rs1pub mod alter_table;
16pub mod columns;
17pub mod create_table;
18pub mod datanode_handler;
19pub mod flownode_handler;
20
21use std::assert_matches::assert_matches;
22use std::collections::HashMap;
23
24use api::v1::meta::Partition;
25use api::v1::{ColumnDataType, SemanticType};
26use common_procedure::Status;
27use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
28use table::metadata::{RawTableInfo, TableId};
29
30use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
31use crate::ddl::test_util::columns::TestColumnDefBuilder;
32use crate::ddl::test_util::create_table::{
33 build_raw_table_info_from_expr, TestCreateTableExprBuilder,
34};
35use crate::ddl::{DdlContext, TableMetadata};
36use crate::key::table_route::TableRouteValue;
37use crate::rpc::ddl::CreateTableTask;
38
39pub async fn create_physical_table_metadata(
40 ddl_context: &DdlContext,
41 table_info: RawTableInfo,
42 table_route: TableRouteValue,
43) {
44 ddl_context
45 .table_metadata_manager
46 .create_table_metadata(table_info, table_route, HashMap::default())
47 .await
48 .unwrap();
49}
50
51pub async fn create_physical_table(ddl_context: &DdlContext, name: &str) -> TableId {
52 let mut create_physical_table_task = test_create_physical_table_task(name);
54 let TableMetadata {
55 table_id,
56 table_route,
57 ..
58 } = ddl_context
59 .table_metadata_allocator
60 .create(&create_physical_table_task)
61 .await
62 .unwrap();
63 create_physical_table_task.set_table_id(table_id);
64 create_physical_table_metadata(
65 ddl_context,
66 create_physical_table_task.table_info.clone(),
67 TableRouteValue::Physical(table_route),
68 )
69 .await;
70
71 table_id
72}
73
74pub async fn create_logical_table(
75 ddl_context: DdlContext,
76 physical_table_id: TableId,
77 table_name: &str,
78) -> TableId {
79 let tasks = vec![test_create_logical_table_task(table_name)];
80 let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
81 let status = procedure.on_prepare().await.unwrap();
82 assert_matches!(
83 status,
84 Status::Executing {
85 persist: true,
86 clean_poisons: false
87 }
88 );
89 let status = procedure.on_create_metadata().await.unwrap();
90 assert_matches!(status, Status::Done { .. });
91
92 let Status::Done {
93 output: Some(output),
94 } = status
95 else {
96 panic!("Unexpected status: {:?}", status);
97 };
98 output.downcast_ref::<Vec<u32>>().unwrap()[0]
99}
100
101pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
102 let create_table = TestCreateTableExprBuilder::default()
103 .column_defs([
104 TestColumnDefBuilder::default()
105 .name("ts")
106 .data_type(ColumnDataType::TimestampMillisecond)
107 .semantic_type(SemanticType::Timestamp)
108 .build()
109 .unwrap()
110 .into(),
111 TestColumnDefBuilder::default()
112 .name("host")
113 .data_type(ColumnDataType::String)
114 .semantic_type(SemanticType::Tag)
115 .build()
116 .unwrap()
117 .into(),
118 TestColumnDefBuilder::default()
119 .name("cpu")
120 .data_type(ColumnDataType::Float64)
121 .semantic_type(SemanticType::Field)
122 .build()
123 .unwrap()
124 .into(),
125 ])
126 .time_index("ts")
127 .primary_keys(["host".into()])
128 .table_name(name)
129 .engine(METRIC_ENGINE_NAME)
130 .table_options(HashMap::from([(
131 LOGICAL_TABLE_METADATA_KEY.to_string(),
132 "phy".to_string(),
133 )]))
134 .build()
135 .unwrap()
136 .into();
137 let table_info = build_raw_table_info_from_expr(&create_table);
138 CreateTableTask {
139 create_table,
140 partitions: vec![Partition {
142 column_list: vec![],
143 value_list: vec![],
144 }],
145 table_info,
146 }
147}
148
149pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
150 let create_table = TestCreateTableExprBuilder::default()
151 .column_defs([
152 TestColumnDefBuilder::default()
153 .name("ts")
154 .data_type(ColumnDataType::TimestampMillisecond)
155 .semantic_type(SemanticType::Timestamp)
156 .build()
157 .unwrap()
158 .into(),
159 TestColumnDefBuilder::default()
160 .name("value")
161 .data_type(ColumnDataType::Float64)
162 .semantic_type(SemanticType::Field)
163 .build()
164 .unwrap()
165 .into(),
166 ])
167 .time_index("ts")
168 .primary_keys(["value".into()])
169 .table_name(name)
170 .engine(METRIC_ENGINE_NAME)
171 .build()
172 .unwrap()
173 .into();
174 let table_info = build_raw_table_info_from_expr(&create_table);
175 CreateTableTask {
176 create_table,
177 partitions: vec![Partition {
179 column_list: vec![],
180 value_list: vec![],
181 }],
182 table_info,
183 }
184}