1pub mod alter_table;
16pub mod columns;
17pub mod create_table;
18pub mod datanode_handler;
19pub mod flownode_handler;
20pub mod region_metadata;
21
22use std::assert_matches;
23use std::collections::HashMap;
24
25use api::v1::meta::Partition;
26use api::v1::{ColumnDataType, SemanticType};
27use common_procedure::Status;
28use datatypes::prelude::ConcreteDataType;
29use datatypes::schema::ColumnSchema;
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::{
32 DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY,
33 METRIC_ENGINE_NAME,
34};
35use store_api::storage::consts::ReservedColumnId;
36use table::metadata::{TableId, TableInfo};
37
38use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
39use crate::ddl::test_util::columns::TestColumnDefBuilder;
40use crate::ddl::test_util::create_table::{
41 TestCreateTableExprBuilder, build_raw_table_info_from_expr,
42};
43use crate::ddl::{DdlContext, TableMetadata};
44use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
45use crate::key::table_route::TableRouteValue;
46use crate::key::{MetadataKey, MetadataValue};
47use crate::peer::Peer;
48use crate::rpc::ddl::CreateTableTask;
49use crate::rpc::store::PutRequest;
50
51pub async fn create_physical_table_metadata(
52 ddl_context: &DdlContext,
53 table_info: TableInfo,
54 table_route: TableRouteValue,
55) {
56 ddl_context
57 .table_metadata_manager
58 .create_table_metadata(table_info, table_route, HashMap::default())
59 .await
60 .unwrap();
61}
62
63pub async fn put_datanode_address(ddl_context: &DdlContext, node_id: u64, addr: &str) {
64 ddl_context
65 .table_metadata_manager
66 .kv_backend()
67 .put(PutRequest {
68 key: NodeAddressKey::with_datanode(node_id).to_bytes(),
69 value: NodeAddressValue::new(Peer::new(node_id, addr))
70 .try_as_raw_value()
71 .unwrap(),
72 ..Default::default()
73 })
74 .await
75 .unwrap();
76}
77
78pub async fn create_physical_table(ddl_context: &DdlContext, name: &str) -> TableId {
79 let mut create_physical_table_task = test_create_physical_table_task(name);
81 let TableMetadata {
82 table_id,
83 table_route,
84 ..
85 } = ddl_context
86 .table_metadata_allocator
87 .create(&create_physical_table_task)
88 .await
89 .unwrap();
90 create_physical_table_task.set_table_id(table_id);
91 create_physical_table_metadata(
92 ddl_context,
93 create_physical_table_task.table_info.clone(),
94 TableRouteValue::Physical(table_route),
95 )
96 .await;
97
98 table_id
99}
100
101pub async fn create_logical_table(
102 ddl_context: DdlContext,
103 physical_table_id: TableId,
104 table_name: &str,
105) -> TableId {
106 let tasks = vec![test_create_logical_table_task(table_name)];
107 let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
108 let status = procedure.on_prepare().await.unwrap();
109 assert_matches!(
110 status,
111 Status::Executing {
112 persist: true,
113 clean_poisons: false
114 }
115 );
116 let status = procedure.on_create_metadata().await.unwrap();
117 assert_matches!(status, Status::Done { .. });
118
119 let Status::Done {
120 output: Some(output),
121 } = status
122 else {
123 panic!("Unexpected status: {:?}", status);
124 };
125 output.downcast_ref::<Vec<u32>>().unwrap()[0]
126}
127
128pub fn test_create_logical_table_task(name: &str) -> CreateTableTask {
129 let create_table = TestCreateTableExprBuilder::default()
130 .column_defs([
131 TestColumnDefBuilder::default()
132 .name("ts")
133 .data_type(ColumnDataType::TimestampMillisecond)
134 .semantic_type(SemanticType::Timestamp)
135 .build()
136 .unwrap()
137 .into(),
138 TestColumnDefBuilder::default()
139 .name("host")
140 .data_type(ColumnDataType::String)
141 .semantic_type(SemanticType::Tag)
142 .build()
143 .unwrap()
144 .into(),
145 TestColumnDefBuilder::default()
146 .name("cpu")
147 .data_type(ColumnDataType::Float64)
148 .semantic_type(SemanticType::Field)
149 .build()
150 .unwrap()
151 .into(),
152 ])
153 .time_index("ts")
154 .primary_keys(["host".into()])
155 .table_name(name)
156 .engine(METRIC_ENGINE_NAME)
157 .table_options(HashMap::from([(
158 LOGICAL_TABLE_METADATA_KEY.to_string(),
159 "phy".to_string(),
160 )]))
161 .build()
162 .unwrap()
163 .into();
164 let table_info = build_raw_table_info_from_expr(&create_table);
165 CreateTableTask {
166 create_table,
167 partitions: vec![Partition::default()],
169 table_info,
170 }
171}
172
173pub fn test_create_physical_table_task(name: &str) -> CreateTableTask {
175 let create_table = TestCreateTableExprBuilder::default()
176 .column_defs([
177 TestColumnDefBuilder::default()
178 .name("ts")
179 .data_type(ColumnDataType::TimestampMillisecond)
180 .semantic_type(SemanticType::Timestamp)
181 .build()
182 .unwrap()
183 .into(),
184 TestColumnDefBuilder::default()
185 .name("value")
186 .data_type(ColumnDataType::Float64)
187 .semantic_type(SemanticType::Field)
188 .build()
189 .unwrap()
190 .into(),
191 ])
192 .time_index("ts")
193 .primary_keys(["value".into()])
194 .table_name(name)
195 .engine(METRIC_ENGINE_NAME)
196 .build()
197 .unwrap()
198 .into();
199 let table_info = build_raw_table_info_from_expr(&create_table);
200 CreateTableTask {
201 create_table,
202 partitions: vec![Partition::default()],
204 table_info,
205 }
206}
207
208pub fn test_column_metadatas(tag_fields: &[&str]) -> Vec<ColumnMetadata> {
210 let mut output = Vec::with_capacity(tag_fields.len() + 4);
211 output.extend([
212 ColumnMetadata {
213 column_schema: ColumnSchema::new(
214 "ts",
215 ConcreteDataType::timestamp_millisecond_datatype(),
216 false,
217 ),
218 semantic_type: SemanticType::Timestamp,
219 column_id: 0,
220 },
221 ColumnMetadata {
222 column_schema: ColumnSchema::new("value", ConcreteDataType::float64_datatype(), false),
223 semantic_type: SemanticType::Field,
224 column_id: 1,
225 },
226 ColumnMetadata {
227 column_schema: ColumnSchema::new(
228 DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
229 ConcreteDataType::timestamp_millisecond_datatype(),
230 false,
231 ),
232 semantic_type: SemanticType::Tag,
233 column_id: ReservedColumnId::table_id(),
234 },
235 ColumnMetadata {
236 column_schema: ColumnSchema::new(
237 DATA_SCHEMA_TSID_COLUMN_NAME,
238 ConcreteDataType::float64_datatype(),
239 false,
240 ),
241 semantic_type: SemanticType::Tag,
242 column_id: ReservedColumnId::tsid(),
243 },
244 ]);
245
246 for (i, name) in tag_fields.iter().enumerate() {
247 output.push(ColumnMetadata {
248 column_schema: ColumnSchema::new(
249 name.to_string(),
250 ConcreteDataType::string_datatype(),
251 true,
252 ),
253 semantic_type: SemanticType::Tag,
254 column_id: (i + 2) as u32,
255 });
256 }
257
258 output
259}
260
261pub fn assert_column_name(table_info: &TableInfo, expected_column_names: &[&str]) {
263 assert_eq!(
264 table_info
265 .meta
266 .schema
267 .column_schemas()
268 .iter()
269 .map(|c| c.name.clone())
270 .collect::<Vec<_>>(),
271 expected_column_names
272 );
273}
274
275pub fn assert_column_name_and_id(column_metadatas: &[ColumnMetadata], expected: &[(&str, u32)]) {
277 assert_eq!(expected.len(), column_metadatas.len());
278 for (name, id) in expected {
279 let column_metadata = column_metadatas
280 .iter()
281 .find(|c| c.column_id == *id)
282 .unwrap();
283 assert_eq!(column_metadata.column_schema.name, *name);
284 }
285}
286
287pub async fn get_raw_table_info(ddl_context: &DdlContext, table_id: TableId) -> TableInfo {
289 ddl_context
290 .table_metadata_manager
291 .table_info_manager()
292 .get(table_id)
293 .await
294 .unwrap()
295 .unwrap()
296 .into_inner()
297 .table_info
298}