1#[cfg(test)]
16mod tests {
17 use std::borrow::Cow;
18 use std::collections::HashMap;
19 use std::sync::atomic::AtomicU32;
20 use std::sync::Arc;
21
22 use api::v1::region::QueryRequest;
23 use client::OutputData;
24 use common_base::Plugins;
25 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
26 use common_meta::key::table_name::TableNameKey;
27 use common_meta::rpc::router::region_distribution;
28 use common_query::Output;
29 use common_recordbatch::RecordBatches;
30 use common_telemetry::debug;
31 use datafusion_expr::LogicalPlan;
32 use frontend::error::{self, Error, Result};
33 use frontend::instance::Instance;
34 use query::parser::QueryLanguageParser;
35 use query::query_engine::DefaultSerializer;
36 use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
37 use servers::query_handler::sql::SqlQueryHandler;
38 use session::context::{QueryContext, QueryContextRef};
39 use sql::statements::statement::Statement;
40 use store_api::storage::RegionId;
41 use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
42
43 use crate::standalone::GreptimeDbStandaloneBuilder;
44 use crate::tests;
45 use crate::tests::MockDistributedInstance;
46
47 #[tokio::test(flavor = "multi_thread")]
48 async fn test_standalone_exec_sql() {
49 let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_exec_sql")
50 .build()
51 .await;
52 let instance = standalone.fe_instance();
53
54 let sql = r#"
55 CREATE TABLE demo(
56 host STRING,
57 ts TIMESTAMP,
58 cpu DOUBLE NULL,
59 memory DOUBLE NULL,
60 disk_util DOUBLE DEFAULT 9.9,
61 TIME INDEX (ts),
62 PRIMARY KEY(host)
63 ) engine=mito"#;
64 create_table(instance, sql).await;
65
66 insert_and_query(instance).await;
67
68 drop_table(instance).await;
69 }
70
71 #[tokio::test(flavor = "multi_thread")]
72 async fn test_distributed_exec_sql() {
73 common_telemetry::init_default_ut_logging();
74
75 let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await;
76 let frontend = distributed.frontend();
77 let instance = frontend.as_ref();
78
79 let sql = r#"
80 CREATE TABLE demo(
81 host STRING,
82 ts TIMESTAMP,
83 cpu DOUBLE NULL,
84 memory DOUBLE NULL,
85 disk_util DOUBLE DEFAULT 9.9,
86 TIME INDEX (ts),
87 PRIMARY KEY(host)
88 )
89 PARTITION ON COLUMNS (host) (
90 host < '550-A',
91 host >= '550-A' AND host < '550-W',
92 host >= '550-W' AND host < 'MOSS',
93 host >= 'MOSS'
94 )
95 engine=mito"#;
96 create_table(instance, sql).await;
97
98 insert_and_query(instance).await;
99
100 verify_data_distribution(
101 &distributed,
102 HashMap::from([
103 (
104 0u32,
105 "\
106+---------------------+------+
107| ts | host |
108+---------------------+------+
109| 2013-12-31T16:00:00 | 490 |
110+---------------------+------+",
111 ),
112 (
113 1u32,
114 "\
115+---------------------+-------+
116| ts | host |
117+---------------------+-------+
118| 2022-12-31T16:00:00 | 550-A |
119+---------------------+-------+",
120 ),
121 (
122 2u32,
123 "\
124+---------------------+-------+
125| ts | host |
126+---------------------+-------+
127| 2023-12-31T16:00:00 | 550-W |
128+---------------------+-------+",
129 ),
130 (
131 3u32,
132 "\
133+---------------------+------+
134| ts | host |
135+---------------------+------+
136| 2043-12-31T16:00:00 | MOSS |
137+---------------------+------+",
138 ),
139 ]),
140 )
141 .await;
142
143 drop_table(instance).await;
144
145 verify_table_is_dropped(&distributed).await;
146 }
147
148 async fn query(instance: &Instance, sql: &str) -> Output {
149 SqlQueryHandler::do_query(instance, sql, QueryContext::arc())
150 .await
151 .remove(0)
152 .unwrap()
153 }
154
155 async fn create_table(instance: &Instance, sql: &str) {
156 let output = query(instance, sql).await;
157 let OutputData::AffectedRows(x) = output.data else {
158 unreachable!()
159 };
160 assert_eq!(x, 0);
161 }
162
163 async fn insert_and_query(instance: &Instance) {
164 let sql = r#"INSERT INTO demo(host, cpu, memory, ts) VALUES
165 ('490', 0.1, 1, 1388505600000),
166 ('550-A', 1, 100, 1672502400000),
167 ('550-W', 10000, 1000000, 1704038400000),
168 ('MOSS', 100000000, 10000000000, 2335190400000)
169 "#;
170 let output = query(instance, sql).await;
171 let OutputData::AffectedRows(x) = output.data else {
172 unreachable!()
173 };
174 assert_eq!(x, 4);
175
176 let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; let output = query(instance, sql).await;
178 let OutputData::Stream(s) = output.data else {
179 unreachable!()
180 };
181 let batches = common_recordbatch::util::collect_batches(s).await.unwrap();
182 let pretty_print = batches.pretty_print().unwrap();
183 let expected = "\
184+-------+---------------------+-------------+---------------+-----------+
185| host | ts | cpu | memory | disk_util |
186+-------+---------------------+-------------+---------------+-----------+
187| 490 | 2013-12-31T16:00:00 | 0.1 | 1.0 | 9.9 |
188| 550-A | 2022-12-31T16:00:00 | 1.0 | 100.0 | 9.9 |
189| 550-W | 2023-12-31T16:00:00 | 10000.0 | 1000000.0 | 9.9 |
190| MOSS | 2043-12-31T16:00:00 | 100000000.0 | 10000000000.0 | 9.9 |
191+-------+---------------------+-------------+---------------+-----------+";
192 assert_eq!(pretty_print, expected);
193 }
194
195 async fn verify_data_distribution(
196 instance: &MockDistributedInstance,
197 expected_distribution: HashMap<u32, &str>,
198 ) {
199 let manager = instance.table_metadata_manager();
200 let table_id = manager
201 .table_name_manager()
202 .get(TableNameKey::new(
203 DEFAULT_CATALOG_NAME,
204 DEFAULT_SCHEMA_NAME,
205 "demo",
206 ))
207 .await
208 .unwrap()
209 .unwrap()
210 .table_id();
211 debug!("Reading table {table_id}");
212
213 let table_route_value = manager
214 .table_route_manager()
215 .table_route_storage()
216 .get(table_id)
217 .await
218 .unwrap()
219 .unwrap();
220
221 let region_to_dn_map = region_distribution(
222 table_route_value
223 .region_routes()
224 .expect("region routes should be physical"),
225 )
226 .iter()
227 .map(|(k, v)| (v.leader_regions[0], *k))
228 .collect::<HashMap<u32, u64>>();
229 assert!(region_to_dn_map.len() <= instance.datanodes().len());
230
231 let stmt = QueryLanguageParser::parse_sql(
232 "SELECT ts, host FROM demo ORDER BY ts",
233 &QueryContext::arc(),
234 )
235 .unwrap();
236 let plan = instance
237 .frontend()
238 .statement_executor()
239 .plan(&stmt, QueryContext::arc())
240 .await
241 .unwrap();
242 let plan = DFLogicalSubstraitConvertor
243 .encode(&plan, DefaultSerializer)
244 .unwrap();
245
246 for (region, dn) in region_to_dn_map.iter() {
247 let region_server = instance.datanodes().get(dn).unwrap().region_server();
248
249 let region_id = RegionId::new(table_id, *region);
250
251 let stream = region_server
252 .handle_remote_read(QueryRequest {
253 region_id: region_id.as_u64(),
254 plan: plan.to_vec(),
255 ..Default::default()
256 })
257 .await
258 .unwrap();
259
260 let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
261 let actual = recordbatches.pretty_print().unwrap();
262
263 let expected = expected_distribution.get(region).unwrap();
264 assert_eq!(&actual, expected);
265 }
266 }
267
268 async fn drop_table(instance: &Instance) {
269 let sql = "DROP TABLE demo";
270 let output = query(instance, sql).await;
271 let OutputData::AffectedRows(x) = output.data else {
272 unreachable!()
273 };
274 assert_eq!(x, 0);
275 }
276
277 async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
278 assert!(instance
279 .frontend()
280 .catalog_manager()
281 .table("greptime", "public", "demo", None)
282 .await
283 .unwrap()
284 .is_none())
285 }
286
287 #[tokio::test(flavor = "multi_thread")]
288 async fn test_sql_interceptor_plugin() {
289 #[derive(Default)]
290 struct AssertionHook {
291 pub(crate) c: AtomicU32,
292 }
293
294 impl SqlQueryInterceptor for AssertionHook {
295 type Error = Error;
296
297 fn pre_parsing<'a>(
298 &self,
299 query: &'a str,
300 _query_ctx: QueryContextRef,
301 ) -> Result<Cow<'a, str>> {
302 let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
303 assert!(query.starts_with("CREATE TABLE demo"));
304 Ok(Cow::Borrowed(query))
305 }
306
307 fn post_parsing(
308 &self,
309 statements: Vec<Statement>,
310 _query_ctx: QueryContextRef,
311 ) -> Result<Vec<Statement>> {
312 let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
313 assert!(matches!(statements[0], Statement::CreateTable(_)));
314 Ok(statements)
315 }
316
317 fn pre_execute(
318 &self,
319 _statement: &Statement,
320 _plan: Option<&LogicalPlan>,
321 _query_ctx: QueryContextRef,
322 ) -> Result<()> {
323 let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
324 Ok(())
325 }
326
327 fn post_execute(
328 &self,
329 mut output: Output,
330 _query_ctx: QueryContextRef,
331 ) -> Result<Output> {
332 let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
333 match &mut output.data {
334 OutputData::AffectedRows(rows) => {
335 assert_eq!(*rows, 0);
336 *rows = 10;
338 }
339 _ => unreachable!(),
340 }
341 Ok(output)
342 }
343 }
344
345 let plugins = Plugins::new();
346 let counter_hook = Arc::new(AssertionHook::default());
347 plugins.insert::<SqlQueryInterceptorRef<Error>>(counter_hook.clone());
348
349 let standalone = GreptimeDbStandaloneBuilder::new("test_sql_interceptor_plugin")
350 .with_plugin(plugins)
351 .build()
352 .await;
353 let instance = standalone.fe_instance().clone();
354
355 let sql = r#"CREATE TABLE demo(
356 host STRING,
357 ts TIMESTAMP,
358 cpu DOUBLE NULL,
359 memory DOUBLE NULL,
360 disk_util DOUBLE DEFAULT 9.9,
361 TIME INDEX (ts),
362 PRIMARY KEY(host)
363 ) engine=mito;"#;
364 let output = SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc())
365 .await
366 .remove(0)
367 .unwrap();
368
369 assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed));
371 match output.data {
372 OutputData::AffectedRows(rows) => assert_eq!(rows, 10),
373 _ => unreachable!(),
374 }
375 }
376
377 #[tokio::test(flavor = "multi_thread")]
378 async fn test_disable_db_operation_plugin() {
379 #[derive(Default)]
380 struct DisableDBOpHook;
381
382 impl SqlQueryInterceptor for DisableDBOpHook {
383 type Error = Error;
384
385 fn post_parsing(
386 &self,
387 statements: Vec<Statement>,
388 _query_ctx: QueryContextRef,
389 ) -> Result<Vec<Statement>> {
390 for s in &statements {
391 match s {
392 Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => {
393 return Err(Error::NotSupported {
394 feat: "Database operations".to_owned(),
395 })
396 }
397 _ => {}
398 }
399 }
400
401 Ok(statements)
402 }
403 }
404
405 let query_ctx = QueryContext::arc();
406
407 let plugins = Plugins::new();
408 let hook = Arc::new(DisableDBOpHook);
409 plugins.insert::<SqlQueryInterceptorRef<Error>>(hook.clone());
410
411 let standalone = GreptimeDbStandaloneBuilder::new("test_disable_db_operation_plugin")
412 .with_plugin(plugins)
413 .build()
414 .await;
415 let instance = standalone.fe_instance().clone();
416
417 let sql = r#"CREATE TABLE demo(
418 host STRING,
419 ts TIMESTAMP,
420 cpu DOUBLE NULL,
421 memory DOUBLE NULL,
422 disk_util DOUBLE DEFAULT 9.9,
423 TIME INDEX (ts),
424 PRIMARY KEY(host)
425 ) engine=mito;"#;
426 let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
427 .await
428 .remove(0)
429 .unwrap();
430
431 match output.data {
432 OutputData::AffectedRows(rows) => assert_eq!(rows, 0),
433 _ => unreachable!(),
434 }
435
436 let sql = r#"CREATE DATABASE tomcat"#;
437 if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
438 .await
439 .remove(0)
440 {
441 assert!(matches!(e, error::Error::NotSupported { .. }));
442 } else {
443 unreachable!();
444 }
445
446 let sql = r#"SELECT 1; SHOW DATABASES"#;
447 if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
448 .await
449 .remove(0)
450 {
451 assert!(matches!(e, error::Error::NotSupported { .. }));
452 } else {
453 unreachable!();
454 }
455 }
456}