1#[cfg(test)]
16mod tests {
17 use std::borrow::Cow;
18 use std::collections::HashMap;
19 use std::sync::Arc;
20 use std::sync::atomic::AtomicU32;
21
22 use api::v1::region::QueryRequest;
23 use client::OutputData;
24 use common_base::Plugins;
25 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
26 use common_meta::key::table_name::TableNameKey;
27 use common_meta::rpc::router::region_distribution;
28 use common_query::Output;
29 use common_recordbatch::RecordBatches;
30 use common_telemetry::debug;
31 use datafusion_expr::LogicalPlan;
32 use frontend::error::{self, Error, Result};
33 use frontend::instance::Instance;
34 use query::parser::QueryLanguageParser;
35 use query::query_engine::DefaultSerializer;
36 use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
37 use servers::query_handler::sql::SqlQueryHandler;
38 use session::context::{QueryContext, QueryContextRef};
39 use sql::statements::statement::Statement;
40 use store_api::storage::RegionId;
41 use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
42
43 use crate::standalone::GreptimeDbStandaloneBuilder;
44 use crate::tests;
45 use crate::tests::MockDistributedInstance;
46
47 #[tokio::test(flavor = "multi_thread")]
48 async fn test_standalone_exec_sql() {
49 let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_exec_sql")
50 .build()
51 .await;
52 let instance = standalone.fe_instance();
53
54 let sql = r#"
55 CREATE TABLE demo(
56 host STRING,
57 ts TIMESTAMP,
58 cpu DOUBLE NULL,
59 memory DOUBLE NULL,
60 disk_util DOUBLE DEFAULT 9.9,
61 TIME INDEX (ts),
62 PRIMARY KEY(host)
63 ) engine=mito"#;
64 create_table(instance, sql).await;
65
66 insert_and_query(instance).await;
67
68 drop_table(instance).await;
69 }
70
71 #[tokio::test(flavor = "multi_thread")]
72 async fn test_distributed_exec_sql() {
73 common_telemetry::init_default_ut_logging();
74
75 let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await;
76 let frontend = distributed.frontend();
77 let instance = frontend.as_ref();
78
79 let sql = r#"
80 CREATE TABLE demo(
81 host STRING,
82 ts TIMESTAMP,
83 cpu DOUBLE NULL,
84 memory DOUBLE NULL,
85 disk_util DOUBLE DEFAULT 9.9,
86 TIME INDEX (ts),
87 PRIMARY KEY(host)
88 )
89 PARTITION ON COLUMNS (host) (
90 host < '550-A',
91 host >= '550-A' AND host < '550-W',
92 host >= '550-W' AND host < 'MOSS',
93 host >= 'MOSS'
94 )
95 engine=mito"#;
96 create_table(instance, sql).await;
97
98 insert_and_query(instance).await;
99
100 verify_data_distribution(
101 &distributed,
102 HashMap::from([
103 (
104 0u32,
105 "\
106+---------------------+------+
107| ts | host |
108+---------------------+------+
109| 2013-12-31T16:00:00 | 490 |
110+---------------------+------+",
111 ),
112 (
113 1u32,
114 "\
115+---------------------+-------+
116| ts | host |
117+---------------------+-------+
118| 2022-12-31T16:00:00 | 550-A |
119+---------------------+-------+",
120 ),
121 (
122 2u32,
123 "\
124+---------------------+-------+
125| ts | host |
126+---------------------+-------+
127| 2023-12-31T16:00:00 | 550-W |
128+---------------------+-------+",
129 ),
130 (
131 3u32,
132 "\
133+---------------------+------+
134| ts | host |
135+---------------------+------+
136| 2043-12-31T16:00:00 | MOSS |
137+---------------------+------+",
138 ),
139 ]),
140 )
141 .await;
142
143 drop_table(instance).await;
144
145 verify_table_is_dropped(&distributed).await;
146 }
147
148 async fn query(instance: &Instance, sql: &str) -> Output {
149 SqlQueryHandler::do_query(instance, sql, QueryContext::arc())
150 .await
151 .remove(0)
152 .unwrap()
153 }
154
155 async fn create_table(instance: &Instance, sql: &str) {
156 let output = query(instance, sql).await;
157 let OutputData::AffectedRows(x) = output.data else {
158 unreachable!()
159 };
160 assert_eq!(x, 0);
161 }
162
163 async fn insert_and_query(instance: &Instance) {
164 let sql = r#"INSERT INTO demo(host, cpu, memory, ts) VALUES
165 ('490', 0.1, 1, 1388505600000),
166 ('550-A', 1, 100, 1672502400000),
167 ('550-W', 10000, 1000000, 1704038400000),
168 ('MOSS', 100000000, 10000000000, 2335190400000)
169 "#;
170 let output = query(instance, sql).await;
171 let OutputData::AffectedRows(x) = output.data else {
172 unreachable!()
173 };
174 assert_eq!(x, 4);
175
176 let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; let output = query(instance, sql).await;
178 let OutputData::Stream(s) = output.data else {
179 unreachable!()
180 };
181 let batches = common_recordbatch::util::collect_batches(s).await.unwrap();
182 let pretty_print = batches.pretty_print().unwrap();
183 let expected = "\
184+-------+---------------------+-------------+---------------+-----------+
185| host | ts | cpu | memory | disk_util |
186+-------+---------------------+-------------+---------------+-----------+
187| 490 | 2013-12-31T16:00:00 | 0.1 | 1.0 | 9.9 |
188| 550-A | 2022-12-31T16:00:00 | 1.0 | 100.0 | 9.9 |
189| 550-W | 2023-12-31T16:00:00 | 10000.0 | 1000000.0 | 9.9 |
190| MOSS | 2043-12-31T16:00:00 | 100000000.0 | 10000000000.0 | 9.9 |
191+-------+---------------------+-------------+---------------+-----------+";
192 assert_eq!(pretty_print, expected);
193 }
194
195 async fn verify_data_distribution(
196 instance: &MockDistributedInstance,
197 expected_distribution: HashMap<u32, &str>,
198 ) {
199 let manager = instance.table_metadata_manager();
200 let table_id = manager
201 .table_name_manager()
202 .get(TableNameKey::new(
203 DEFAULT_CATALOG_NAME,
204 DEFAULT_SCHEMA_NAME,
205 "demo",
206 ))
207 .await
208 .unwrap()
209 .unwrap()
210 .table_id();
211 debug!("Reading table {table_id}");
212
213 let table_route_value = manager
214 .table_route_manager()
215 .table_route_storage()
216 .get(table_id)
217 .await
218 .unwrap()
219 .unwrap();
220
221 let region_to_dn_map = region_distribution(
222 table_route_value
223 .region_routes()
224 .expect("region routes should be physical"),
225 )
226 .iter()
227 .map(|(k, v)| (v.leader_regions[0], *k))
228 .collect::<HashMap<u32, u64>>();
229 assert!(region_to_dn_map.len() <= instance.datanodes().len());
230
231 let stmt = QueryLanguageParser::parse_sql(
232 "SELECT ts, host FROM demo ORDER BY ts",
233 &QueryContext::arc(),
234 )
235 .unwrap();
236 let plan = instance
237 .frontend()
238 .statement_executor()
239 .plan(&stmt, QueryContext::arc())
240 .await
241 .unwrap();
242 let plan = DFLogicalSubstraitConvertor
243 .encode(&plan, DefaultSerializer)
244 .unwrap();
245
246 for (region, dn) in region_to_dn_map.iter() {
247 let region_server = instance.datanodes().get(dn).unwrap().region_server();
248
249 let region_id = RegionId::new(table_id, *region);
250
251 let stream = region_server
252 .handle_remote_read(
253 QueryRequest {
254 region_id: region_id.as_u64(),
255 plan: plan.to_vec(),
256 ..Default::default()
257 },
258 QueryContext::arc(),
259 )
260 .await
261 .unwrap();
262
263 let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
264 let actual = recordbatches.pretty_print().unwrap();
265
266 let expected = expected_distribution.get(region).unwrap();
267 assert_eq!(&actual, expected);
268 }
269 }
270
271 async fn drop_table(instance: &Instance) {
272 let sql = "DROP TABLE demo";
273 let output = query(instance, sql).await;
274 let OutputData::AffectedRows(x) = output.data else {
275 unreachable!()
276 };
277 assert_eq!(x, 0);
278 }
279
280 async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
281 assert!(
282 instance
283 .frontend()
284 .catalog_manager()
285 .table("greptime", "public", "demo", None)
286 .await
287 .unwrap()
288 .is_none()
289 )
290 }
291
292 #[tokio::test(flavor = "multi_thread")]
293 async fn test_sql_interceptor_plugin() {
294 #[derive(Default)]
295 struct AssertionHook {
296 pub(crate) c: AtomicU32,
297 }
298
299 impl SqlQueryInterceptor for AssertionHook {
300 type Error = Error;
301
302 fn pre_parsing<'a>(
303 &self,
304 query: &'a str,
305 _query_ctx: QueryContextRef,
306 ) -> Result<Cow<'a, str>> {
307 let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
308 assert!(query.starts_with("CREATE TABLE demo"));
309 Ok(Cow::Borrowed(query))
310 }
311
312 fn post_parsing(
313 &self,
314 statements: Vec<Statement>,
315 _query_ctx: QueryContextRef,
316 ) -> Result<Vec<Statement>> {
317 let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
318 assert!(matches!(statements[0], Statement::CreateTable(_)));
319 Ok(statements)
320 }
321
322 fn pre_execute(
323 &self,
324 _statement: &Statement,
325 _plan: Option<&LogicalPlan>,
326 _query_ctx: QueryContextRef,
327 ) -> Result<()> {
328 let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
329 Ok(())
330 }
331
332 fn post_execute(
333 &self,
334 mut output: Output,
335 _query_ctx: QueryContextRef,
336 ) -> Result<Output> {
337 let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
338 match &mut output.data {
339 OutputData::AffectedRows(rows) => {
340 assert_eq!(*rows, 0);
341 *rows = 10;
343 }
344 _ => unreachable!(),
345 }
346 Ok(output)
347 }
348 }
349
350 let plugins = Plugins::new();
351 let counter_hook = Arc::new(AssertionHook::default());
352 plugins.insert::<SqlQueryInterceptorRef<Error>>(counter_hook.clone());
353
354 let standalone = GreptimeDbStandaloneBuilder::new("test_sql_interceptor_plugin")
355 .with_plugin(plugins)
356 .build()
357 .await;
358 let instance = standalone.fe_instance().clone();
359
360 let sql = r#"CREATE TABLE demo(
361 host STRING,
362 ts TIMESTAMP,
363 cpu DOUBLE NULL,
364 memory DOUBLE NULL,
365 disk_util DOUBLE DEFAULT 9.9,
366 TIME INDEX (ts),
367 PRIMARY KEY(host)
368 ) engine=mito;"#;
369 let output = SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc())
370 .await
371 .remove(0)
372 .unwrap();
373
374 assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed));
376 match output.data {
377 OutputData::AffectedRows(rows) => assert_eq!(rows, 10),
378 _ => unreachable!(),
379 }
380 }
381
382 #[tokio::test(flavor = "multi_thread")]
383 async fn test_disable_db_operation_plugin() {
384 #[derive(Default)]
385 struct DisableDBOpHook;
386
387 impl SqlQueryInterceptor for DisableDBOpHook {
388 type Error = Error;
389
390 fn post_parsing(
391 &self,
392 statements: Vec<Statement>,
393 _query_ctx: QueryContextRef,
394 ) -> Result<Vec<Statement>> {
395 for s in &statements {
396 match s {
397 Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => {
398 return Err(Error::NotSupported {
399 feat: "Database operations".to_owned(),
400 });
401 }
402 _ => {}
403 }
404 }
405
406 Ok(statements)
407 }
408 }
409
410 let query_ctx = QueryContext::arc();
411
412 let plugins = Plugins::new();
413 let hook = Arc::new(DisableDBOpHook);
414 plugins.insert::<SqlQueryInterceptorRef<Error>>(hook.clone());
415
416 let standalone = GreptimeDbStandaloneBuilder::new("test_disable_db_operation_plugin")
417 .with_plugin(plugins)
418 .build()
419 .await;
420 let instance = standalone.fe_instance().clone();
421
422 let sql = r#"CREATE TABLE demo(
423 host STRING,
424 ts TIMESTAMP,
425 cpu DOUBLE NULL,
426 memory DOUBLE NULL,
427 disk_util DOUBLE DEFAULT 9.9,
428 TIME INDEX (ts),
429 PRIMARY KEY(host)
430 ) engine=mito;"#;
431 let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
432 .await
433 .remove(0)
434 .unwrap();
435
436 match output.data {
437 OutputData::AffectedRows(rows) => assert_eq!(rows, 0),
438 _ => unreachable!(),
439 }
440
441 let sql = r#"CREATE DATABASE tomcat"#;
442 if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
443 .await
444 .remove(0)
445 {
446 assert!(matches!(e, error::Error::NotSupported { .. }));
447 } else {
448 unreachable!();
449 }
450
451 let sql = r#"SELECT 1; SHOW DATABASES"#;
452 if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
453 .await
454 .remove(0)
455 {
456 assert!(matches!(e, error::Error::NotSupported { .. }));
457 } else {
458 unreachable!();
459 }
460 }
461}