1#[cfg(test)]
16mod tests {
17 use std::borrow::Cow;
18 use std::collections::HashMap;
19 use std::sync::Arc;
20 use std::sync::atomic::AtomicU32;
21
22 use api::v1::region::QueryRequest;
23 use client::OutputData;
24 use common_base::Plugins;
25 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
26 use common_error::ext::ErrorExt;
27 use common_error::status_code::StatusCode;
28 use common_meta::key::table_name::TableNameKey;
29 use common_meta::rpc::router::region_distribution;
30 use common_query::Output;
31 use common_recordbatch::RecordBatches;
32 use common_telemetry::debug;
33 use datafusion_expr::LogicalPlan;
34 use frontend::error::{Error, Result};
35 use frontend::instance::Instance;
36 use query::parser::QueryLanguageParser;
37 use query::query_engine::DefaultSerializer;
38 use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
39 use servers::query_handler::sql::SqlQueryHandler;
40 use session::context::{QueryContext, QueryContextRef};
41 use sql::statements::statement::Statement;
42 use store_api::storage::RegionId;
43 use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
44
45 use crate::standalone::GreptimeDbStandaloneBuilder;
46 use crate::tests;
47 use crate::tests::MockDistributedInstance;
48
49 #[tokio::test(flavor = "multi_thread")]
50 async fn test_standalone_exec_sql() {
51 let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_exec_sql")
52 .build()
53 .await;
54 let instance = standalone.fe_instance();
55
56 let sql = r#"
57 CREATE TABLE demo(
58 host STRING,
59 ts TIMESTAMP,
60 cpu DOUBLE NULL,
61 memory DOUBLE NULL,
62 disk_util DOUBLE DEFAULT 9.9,
63 TIME INDEX (ts),
64 PRIMARY KEY(host)
65 ) engine=mito"#;
66 create_table(instance, sql).await;
67
68 insert_and_query(instance).await;
69
70 drop_table(instance).await;
71 }
72
73 #[tokio::test(flavor = "multi_thread")]
74 async fn test_distributed_exec_sql() {
75 common_telemetry::init_default_ut_logging();
76
77 let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await;
78 let frontend = distributed.frontend();
79 let instance = frontend.as_ref();
80
81 let sql = r#"
82 CREATE TABLE demo(
83 host STRING,
84 ts TIMESTAMP,
85 cpu DOUBLE NULL,
86 memory DOUBLE NULL,
87 disk_util DOUBLE DEFAULT 9.9,
88 TIME INDEX (ts),
89 PRIMARY KEY(host)
90 )
91 PARTITION ON COLUMNS (host) (
92 host < '550-A',
93 host >= '550-A' AND host < '550-W',
94 host >= '550-W' AND host < 'MOSS',
95 host >= 'MOSS'
96 )
97 engine=mito"#;
98 create_table(instance, sql).await;
99
100 insert_and_query(instance).await;
101
102 verify_data_distribution(
103 &distributed,
104 HashMap::from([
105 (
106 0u32,
107 "\
108+---------------------+------+
109| ts | host |
110+---------------------+------+
111| 2013-12-31T16:00:00 | 490 |
112+---------------------+------+",
113 ),
114 (
115 1u32,
116 "\
117+---------------------+-------+
118| ts | host |
119+---------------------+-------+
120| 2022-12-31T16:00:00 | 550-A |
121+---------------------+-------+",
122 ),
123 (
124 2u32,
125 "\
126+---------------------+-------+
127| ts | host |
128+---------------------+-------+
129| 2023-12-31T16:00:00 | 550-W |
130+---------------------+-------+",
131 ),
132 (
133 3u32,
134 "\
135+---------------------+------+
136| ts | host |
137+---------------------+------+
138| 2043-12-31T16:00:00 | MOSS |
139+---------------------+------+",
140 ),
141 ]),
142 )
143 .await;
144
145 drop_table(instance).await;
146
147 verify_table_is_dropped(&distributed).await;
148 }
149
150 async fn query(instance: &Instance, sql: &str) -> Output {
151 SqlQueryHandler::do_query(instance, sql, QueryContext::arc())
152 .await
153 .remove(0)
154 .unwrap()
155 }
156
157 async fn create_table(instance: &Instance, sql: &str) {
158 let output = query(instance, sql).await;
159 let OutputData::AffectedRows(x) = output.data else {
160 unreachable!()
161 };
162 assert_eq!(x, 0);
163 }
164
165 async fn insert_and_query(instance: &Instance) {
166 let sql = r#"INSERT INTO demo(host, cpu, memory, ts) VALUES
167 ('490', 0.1, 1, 1388505600000),
168 ('550-A', 1, 100, 1672502400000),
169 ('550-W', 10000, 1000000, 1704038400000),
170 ('MOSS', 100000000, 10000000000, 2335190400000)
171 "#;
172 let output = query(instance, sql).await;
173 let OutputData::AffectedRows(x) = output.data else {
174 unreachable!()
175 };
176 assert_eq!(x, 4);
177
178 let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; let output = query(instance, sql).await;
180 let OutputData::Stream(s) = output.data else {
181 unreachable!()
182 };
183 let batches = common_recordbatch::util::collect_batches(s).await.unwrap();
184 let pretty_print = batches.pretty_print().unwrap();
185 let expected = "\
186+-------+---------------------+-------------+---------------+-----------+
187| host | ts | cpu | memory | disk_util |
188+-------+---------------------+-------------+---------------+-----------+
189| 490 | 2013-12-31T16:00:00 | 0.1 | 1.0 | 9.9 |
190| 550-A | 2022-12-31T16:00:00 | 1.0 | 100.0 | 9.9 |
191| 550-W | 2023-12-31T16:00:00 | 10000.0 | 1000000.0 | 9.9 |
192| MOSS | 2043-12-31T16:00:00 | 100000000.0 | 10000000000.0 | 9.9 |
193+-------+---------------------+-------------+---------------+-----------+";
194 assert_eq!(pretty_print, expected);
195 }
196
197 async fn verify_data_distribution(
198 instance: &MockDistributedInstance,
199 expected_distribution: HashMap<u32, &str>,
200 ) {
201 let manager = instance.table_metadata_manager();
202 let table_id = manager
203 .table_name_manager()
204 .get(TableNameKey::new(
205 DEFAULT_CATALOG_NAME,
206 DEFAULT_SCHEMA_NAME,
207 "demo",
208 ))
209 .await
210 .unwrap()
211 .unwrap()
212 .table_id();
213 debug!("Reading table {table_id}");
214
215 let table_route_value = manager
216 .table_route_manager()
217 .table_route_storage()
218 .get(table_id)
219 .await
220 .unwrap()
221 .unwrap();
222
223 let region_to_dn_map = region_distribution(
224 table_route_value
225 .region_routes()
226 .expect("region routes should be physical"),
227 )
228 .iter()
229 .map(|(k, v)| (v.leader_regions[0], *k))
230 .collect::<HashMap<u32, u64>>();
231 assert!(region_to_dn_map.len() <= instance.datanodes().len());
232
233 let stmt = QueryLanguageParser::parse_sql(
234 "SELECT ts, host FROM demo ORDER BY ts",
235 &QueryContext::arc(),
236 )
237 .unwrap();
238 let plan = instance
239 .frontend()
240 .statement_executor()
241 .plan(&stmt, QueryContext::arc())
242 .await
243 .unwrap();
244 let plan = DFLogicalSubstraitConvertor
245 .encode(&plan, DefaultSerializer)
246 .unwrap();
247
248 for (region, dn) in region_to_dn_map.iter() {
249 let region_server = instance.datanodes().get(dn).unwrap().region_server();
250
251 let region_id = RegionId::new(table_id, *region);
252
253 let stream = region_server
254 .handle_remote_read(
255 QueryRequest {
256 region_id: region_id.as_u64(),
257 plan: plan.to_vec(),
258 ..Default::default()
259 },
260 QueryContext::arc(),
261 )
262 .await
263 .unwrap();
264
265 let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
266 let actual = recordbatches.pretty_print().unwrap();
267
268 let expected = expected_distribution.get(region).unwrap();
269 assert_eq!(&actual, expected);
270 }
271 }
272
273 async fn drop_table(instance: &Instance) {
274 let sql = "DROP TABLE demo";
275 let output = query(instance, sql).await;
276 let OutputData::AffectedRows(x) = output.data else {
277 unreachable!()
278 };
279 assert_eq!(x, 0);
280 }
281
282 async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
283 assert!(
284 instance
285 .frontend()
286 .catalog_manager()
287 .table("greptime", "public", "demo", None)
288 .await
289 .unwrap()
290 .is_none()
291 )
292 }
293
294 #[tokio::test(flavor = "multi_thread")]
295 async fn test_sql_interceptor_plugin() {
296 #[derive(Default)]
297 struct AssertionHook {
298 pub(crate) c: AtomicU32,
299 }
300
301 impl SqlQueryInterceptor for AssertionHook {
302 type Error = Error;
303
304 fn pre_parsing<'a>(
305 &self,
306 query: &'a str,
307 _query_ctx: QueryContextRef,
308 ) -> Result<Cow<'a, str>> {
309 let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
310 assert!(query.starts_with("CREATE TABLE demo"));
311 Ok(Cow::Borrowed(query))
312 }
313
314 fn post_parsing(
315 &self,
316 statements: Vec<Statement>,
317 _query_ctx: QueryContextRef,
318 ) -> Result<Vec<Statement>> {
319 let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
320 assert!(matches!(statements[0], Statement::CreateTable(_)));
321 Ok(statements)
322 }
323
324 fn pre_execute(
325 &self,
326 _statement: &Statement,
327 _plan: Option<&LogicalPlan>,
328 _query_ctx: QueryContextRef,
329 ) -> Result<()> {
330 let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
331 Ok(())
332 }
333
334 fn post_execute(
335 &self,
336 mut output: Output,
337 _query_ctx: QueryContextRef,
338 ) -> Result<Output> {
339 let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
340 match &mut output.data {
341 OutputData::AffectedRows(rows) => {
342 assert_eq!(*rows, 0);
343 *rows = 10;
345 }
346 _ => unreachable!(),
347 }
348 Ok(output)
349 }
350 }
351
352 let plugins = Plugins::new();
353 let counter_hook = Arc::new(AssertionHook::default());
354 plugins.insert::<SqlQueryInterceptorRef<Error>>(counter_hook.clone());
355
356 let standalone = GreptimeDbStandaloneBuilder::new("test_sql_interceptor_plugin")
357 .with_plugin(plugins)
358 .build()
359 .await;
360 let instance = standalone.fe_instance().clone();
361
362 let sql = r#"CREATE TABLE demo(
363 host STRING,
364 ts TIMESTAMP,
365 cpu DOUBLE NULL,
366 memory DOUBLE NULL,
367 disk_util DOUBLE DEFAULT 9.9,
368 TIME INDEX (ts),
369 PRIMARY KEY(host)
370 ) engine=mito;"#;
371 let output = SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc())
372 .await
373 .remove(0)
374 .unwrap();
375
376 assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed));
378 match output.data {
379 OutputData::AffectedRows(rows) => assert_eq!(rows, 10),
380 _ => unreachable!(),
381 }
382 }
383
384 #[tokio::test(flavor = "multi_thread")]
385 async fn test_disable_db_operation_plugin() {
386 #[derive(Default)]
387 struct DisableDBOpHook;
388
389 impl SqlQueryInterceptor for DisableDBOpHook {
390 type Error = Error;
391
392 fn post_parsing(
393 &self,
394 statements: Vec<Statement>,
395 _query_ctx: QueryContextRef,
396 ) -> Result<Vec<Statement>> {
397 for s in &statements {
398 match s {
399 Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => {
400 return Err(Error::NotSupported {
401 feat: "Database operations".to_owned(),
402 });
403 }
404 _ => {}
405 }
406 }
407
408 Ok(statements)
409 }
410 }
411
412 let query_ctx = QueryContext::arc();
413
414 let plugins = Plugins::new();
415 let hook = Arc::new(DisableDBOpHook);
416 plugins.insert::<SqlQueryInterceptorRef<Error>>(hook.clone());
417
418 let standalone = GreptimeDbStandaloneBuilder::new("test_disable_db_operation_plugin")
419 .with_plugin(plugins)
420 .build()
421 .await;
422 let instance = standalone.fe_instance().clone();
423
424 let sql = r#"CREATE TABLE demo(
425 host STRING,
426 ts TIMESTAMP,
427 cpu DOUBLE NULL,
428 memory DOUBLE NULL,
429 disk_util DOUBLE DEFAULT 9.9,
430 TIME INDEX (ts),
431 PRIMARY KEY(host)
432 ) engine=mito;"#;
433 let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
434 .await
435 .remove(0)
436 .unwrap();
437
438 match output.data {
439 OutputData::AffectedRows(rows) => assert_eq!(rows, 0),
440 _ => unreachable!(),
441 }
442
443 let sql = r#"CREATE DATABASE tomcat"#;
444 if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
445 .await
446 .remove(0)
447 {
448 assert_eq!(e.status_code(), StatusCode::Unsupported);
449 } else {
450 unreachable!();
451 }
452
453 let sql = r#"SELECT 1; SHOW DATABASES"#;
454 if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
455 .await
456 .remove(0)
457 {
458 assert_eq!(e.status_code(), StatusCode::Unsupported);
459 } else {
460 unreachable!();
461 }
462 }
463}