common_meta/ddl/
test_util.rs1pub mod alter_table;
16pub mod columns;
17pub mod create_table;
18pub mod datanode_handler;
19pub mod flownode_handler;
20pub mod region_metadata;
21
22use std::assert_matches::assert_matches;
23use std::collections::HashMap;
24
25use api::v1::meta::Partition;
26use api::v1::{ColumnDataType, SemanticType};
27use common_procedure::Status;
28use datatypes::prelude::ConcreteDataType;
29use datatypes::schema::ColumnSchema;
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::{
32 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
33 METRIC_ENGINE_NAME,
34};
35use store_api::storage::consts::ReservedColumnId;
36use table::metadata::{RawTableInfo, TableId};
37
38use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
39use crate::ddl::test_util::columns::TestColumnDefBuilder;
40use crate::ddl::test_util::create_table::{
41 build_raw_table_info_from_expr, TestCreateTableExprBuilder,
42};
43use crate::ddl::{DdlContext, TableMetadata};
44use crate::key::table_route::TableRouteValue;
45use crate::rpc::ddl::CreateTableTask;
46
47pub async fn create_physical_table_metadata(
48 ddl_context: &DdlContext,
49 table_info: RawTableInfo,
50 table_route: TableRouteValue,
51) {
52 ddl_context
53 .table_metadata_manager
54 .create_table_metadata(table_info, table_route, HashMap::default())
55 .await
56 .unwrap();
57}
58
59pub async fn create_physical_table(ddl_context: &DdlContext, name: &str) -> TableId {
60 let mut create_physical_table_task = test_create_physical_table_task(name);
62 let TableMetadata {
63 table_id,
64 table_route,
65 ..
66 } = ddl_context
67 .table_metadata_allocator
68 .create(&create_physical_table_task)
69 .await
70 .unwrap();
71 create_physical_table_task.set_table_id(table_id);
72 create_physical_table_metadata(
73 ddl_context,
74 create_physical_table_task.table_info.clone(),
75 TableRouteValue::Physical(table_route),
76 )
77 .await;
78
79 table_id
80}
81
82pub async fn create_logical_table(
83 ddl_context: DdlContext,
84 physical_table_id: TableId,
85 table_name: &str,
86) -> TableId {
87 let tasks = vec![test_create_logical_table_task(table_name)];
88 let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
89 let status = procedure.on_prepare().await.unwrap();
90 assert_matches!(
91 status,
92 Status::Executing {
93 persist: true,
94 clean_poisons: false
95 }
96 );
97 let status = procedure.on_create_metadata().await.unwrap();
98 assert_matches!(status, Status::Done { .. });
99
100 let Status::Done {
101 output: Some(output),
102 } = status
103 else {
104 panic!("Unexpected status: {:?}", status);
105 };
106 output.downcast_ref::<Vec<u32>>().unwrap()[0]
107}
108
109pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
110 let create_table = TestCreateTableExprBuilder::default()
111 .column_defs([
112 TestColumnDefBuilder::default()
113 .name("ts")
114 .data_type(ColumnDataType::TimestampMillisecond)
115 .semantic_type(SemanticType::Timestamp)
116 .build()
117 .unwrap()
118 .into(),
119 TestColumnDefBuilder::default()
120 .name("host")
121 .data_type(ColumnDataType::String)
122 .semantic_type(SemanticType::Tag)
123 .build()
124 .unwrap()
125 .into(),
126 TestColumnDefBuilder::default()
127 .name("cpu")
128 .data_type(ColumnDataType::Float64)
129 .semantic_type(SemanticType::Field)
130 .build()
131 .unwrap()
132 .into(),
133 ])
134 .time_index("ts")
135 .primary_keys(["host".into()])
136 .table_name(name)
137 .engine(METRIC_ENGINE_NAME)
138 .table_options(HashMap::from([(
139 LOGICAL_TABLE_METADATA_KEY.to_string(),
140 "phy".to_string(),
141 )]))
142 .build()
143 .unwrap()
144 .into();
145 let table_info = build_raw_table_info_from_expr(&create_table);
146 CreateTableTask {
147 create_table,
148 partitions: vec![Partition::default()],
150 table_info,
151 }
152}
153
154pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
156 let create_table = TestCreateTableExprBuilder::default()
157 .column_defs([
158 TestColumnDefBuilder::default()
159 .name("ts")
160 .data_type(ColumnDataType::TimestampMillisecond)
161 .semantic_type(SemanticType::Timestamp)
162 .build()
163 .unwrap()
164 .into(),
165 TestColumnDefBuilder::default()
166 .name("value")
167 .data_type(ColumnDataType::Float64)
168 .semantic_type(SemanticType::Field)
169 .build()
170 .unwrap()
171 .into(),
172 ])
173 .time_index("ts")
174 .primary_keys(["value".into()])
175 .table_name(name)
176 .engine(METRIC_ENGINE_NAME)
177 .build()
178 .unwrap()
179 .into();
180 let table_info = build_raw_table_info_from_expr(&create_table);
181 CreateTableTask {
182 create_table,
183 partitions: vec![Partition::default()],
185 table_info,
186 }
187}
188
189pub fn test_column_metadatas(tag_fields: &[&str]) -> Vec<ColumnMetadata> {
191 let mut output = Vec::with_capacity(tag_fields.len() + 4);
192 output.extend([
193 ColumnMetadata {
194 column_schema: ColumnSchema::new(
195 "ts",
196 ConcreteDataType::timestamp_millisecond_datatype(),
197 false,
198 ),
199 semantic_type: SemanticType::Timestamp,
200 column_id: 0,
201 },
202 ColumnMetadata {
203 column_schema: ColumnSchema::new("value", ConcreteDataType::float64_datatype(), false),
204 semantic_type: SemanticType::Field,
205 column_id: 1,
206 },
207 ColumnMetadata {
208 column_schema: ColumnSchema::new(
209 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
210 ConcreteDataType::timestamp_millisecond_datatype(),
211 false,
212 ),
213 semantic_type: SemanticType::Tag,
214 column_id: ReservedColumnId::table_id(),
215 },
216 ColumnMetadata {
217 column_schema: ColumnSchema::new(
218 DATA_SCHEMA_TSID_COLUMN_NAME,
219 ConcreteDataType::float64_datatype(),
220 false,
221 ),
222 semantic_type: SemanticType::Tag,
223 column_id: ReservedColumnId::tsid(),
224 },
225 ]);
226
227 for (i, name) in tag_fields.iter().enumerate() {
228 output.push(ColumnMetadata {
229 column_schema: ColumnSchema::new(
230 name.to_string(),
231 ConcreteDataType::string_datatype(),
232 true,
233 ),
234 semantic_type: SemanticType::Tag,
235 column_id: (i + 2) as u32,
236 });
237 }
238
239 output
240}
241
242pub fn assert_column_name(table_info: &RawTableInfo, expected_column_names: &[&str]) {
244 assert_eq!(
245 table_info
246 .meta
247 .schema
248 .column_schemas
249 .iter()
250 .map(|c| c.name.to_string())
251 .collect::<Vec<_>>(),
252 expected_column_names
253 );
254}
255
256pub fn assert_column_name_and_id(column_metadatas: &[ColumnMetadata], expected: &[(&str, u32)]) {
258 assert_eq!(expected.len(), column_metadatas.len());
259 for (name, id) in expected {
260 let column_metadata = column_metadatas
261 .iter()
262 .find(|c| c.column_id == *id)
263 .unwrap();
264 assert_eq!(column_metadata.column_schema.name, *name);
265 }
266}
267
268pub async fn get_raw_table_info(ddl_context: &DdlContext, table_id: TableId) -> RawTableInfo {
270 ddl_context
271 .table_metadata_manager
272 .table_info_manager()
273 .get(table_id)
274 .await
275 .unwrap()
276 .unwrap()
277 .into_inner()
278 .table_info
279}