common_meta/ddl/
test_util.rs1pub mod alter_table;
16pub mod columns;
17pub mod create_table;
18pub mod datanode_handler;
19pub mod flownode_handler;
20
21use std::collections::HashMap;
22
23use api::v1::meta::Partition;
24use api::v1::{ColumnDataType, SemanticType};
25use common_procedure::Status;
26use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
27use table::metadata::{RawTableInfo, TableId};
28
29use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
30use crate::ddl::test_util::columns::TestColumnDefBuilder;
31use crate::ddl::test_util::create_table::{
32 build_raw_table_info_from_expr, TestCreateTableExprBuilder,
33};
34use crate::ddl::{DdlContext, TableMetadata};
35use crate::key::table_route::TableRouteValue;
36use crate::rpc::ddl::CreateTableTask;
37
38pub async fn create_physical_table_metadata(
39 ddl_context: &DdlContext,
40 table_info: RawTableInfo,
41 table_route: TableRouteValue,
42) {
43 ddl_context
44 .table_metadata_manager
45 .create_table_metadata(table_info, table_route, HashMap::default())
46 .await
47 .unwrap();
48}
49
50pub async fn create_physical_table(ddl_context: &DdlContext, name: &str) -> TableId {
51 let mut create_physical_table_task = test_create_physical_table_task(name);
53 let TableMetadata {
54 table_id,
55 table_route,
56 ..
57 } = ddl_context
58 .table_metadata_allocator
59 .create(&create_physical_table_task)
60 .await
61 .unwrap();
62 create_physical_table_task.set_table_id(table_id);
63 create_physical_table_metadata(
64 ddl_context,
65 create_physical_table_task.table_info.clone(),
66 TableRouteValue::Physical(table_route),
67 )
68 .await;
69
70 table_id
71}
72
73pub async fn create_logical_table(
74 ddl_context: DdlContext,
75 physical_table_id: TableId,
76 table_name: &str,
77) -> TableId {
78 use std::assert_matches::assert_matches;
79
80 let tasks = vec![test_create_logical_table_task(table_name)];
81 let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
82 let status = procedure.on_prepare().await.unwrap();
83 assert_matches!(
84 status,
85 Status::Executing {
86 persist: true,
87 clean_poisons: false
88 }
89 );
90 let status = procedure.on_create_metadata().await.unwrap();
91 assert_matches!(status, Status::Done { .. });
92
93 let Status::Done {
94 output: Some(output),
95 } = status
96 else {
97 panic!("Unexpected status: {:?}", status);
98 };
99 output.downcast_ref::<Vec<u32>>().unwrap()[0]
100}
101
102pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
103 let create_table = TestCreateTableExprBuilder::default()
104 .column_defs([
105 TestColumnDefBuilder::default()
106 .name("ts")
107 .data_type(ColumnDataType::TimestampMillisecond)
108 .semantic_type(SemanticType::Timestamp)
109 .build()
110 .unwrap()
111 .into(),
112 TestColumnDefBuilder::default()
113 .name("host")
114 .data_type(ColumnDataType::String)
115 .semantic_type(SemanticType::Tag)
116 .build()
117 .unwrap()
118 .into(),
119 TestColumnDefBuilder::default()
120 .name("cpu")
121 .data_type(ColumnDataType::Float64)
122 .semantic_type(SemanticType::Field)
123 .build()
124 .unwrap()
125 .into(),
126 ])
127 .time_index("ts")
128 .primary_keys(["host".into()])
129 .table_name(name)
130 .engine(METRIC_ENGINE_NAME)
131 .table_options(HashMap::from([(
132 LOGICAL_TABLE_METADATA_KEY.to_string(),
133 "phy".to_string(),
134 )]))
135 .build()
136 .unwrap()
137 .into();
138 let table_info = build_raw_table_info_from_expr(&create_table);
139 CreateTableTask {
140 create_table,
141 partitions: vec![Partition {
143 column_list: vec![],
144 value_list: vec![],
145 }],
146 table_info,
147 }
148}
149
150pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
151 let create_table = TestCreateTableExprBuilder::default()
152 .column_defs([
153 TestColumnDefBuilder::default()
154 .name("ts")
155 .data_type(ColumnDataType::TimestampMillisecond)
156 .semantic_type(SemanticType::Timestamp)
157 .build()
158 .unwrap()
159 .into(),
160 TestColumnDefBuilder::default()
161 .name("value")
162 .data_type(ColumnDataType::Float64)
163 .semantic_type(SemanticType::Field)
164 .build()
165 .unwrap()
166 .into(),
167 ])
168 .time_index("ts")
169 .primary_keys(["value".into()])
170 .table_name(name)
171 .engine(METRIC_ENGINE_NAME)
172 .build()
173 .unwrap()
174 .into();
175 let table_info = build_raw_table_info_from_expr(&create_table);
176 CreateTableTask {
177 create_table,
178 partitions: vec![Partition {
180 column_list: vec![],
181 value_list: vec![],
182 }],
183 table_info,
184 }
185}