Skip to main content

tests_integration/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests {
17    use std::borrow::Cow;
18    use std::collections::HashMap;
19    use std::sync::Arc;
20    use std::sync::atomic::AtomicU32;
21
22    use api::v1::region::QueryRequest;
23    use client::OutputData;
24    use common_base::Plugins;
25    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
26    use common_error::ext::ErrorExt;
27    use common_error::status_code::StatusCode;
28    use common_meta::key::table_name::TableNameKey;
29    use common_meta::rpc::router::region_distribution;
30    use common_query::Output;
31    use common_recordbatch::RecordBatches;
32    use common_telemetry::debug;
33    use datafusion_expr::LogicalPlan;
34    use frontend::error::{Error, Result};
35    use frontend::instance::Instance;
36    use query::parser::{QueryLanguageParser, QueryStatement};
37    use query::query_engine::DefaultSerializer;
38    use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
39    use servers::query_handler::sql::SqlQueryHandler;
40    use session::context::{QueryContext, QueryContextRef};
41    use sql::statements::statement::Statement;
42    use store_api::storage::RegionId;
43    use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
44
45    use crate::standalone::GreptimeDbStandaloneBuilder;
46    use crate::tests;
47    use crate::tests::MockDistributedInstance;
48
49    #[tokio::test(flavor = "multi_thread")]
50    async fn test_standalone_exec_sql() {
51        let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_exec_sql")
52            .build()
53            .await;
54        let instance = standalone.fe_instance();
55
56        let sql = r#"
57            CREATE TABLE demo(
58                host STRING,
59                ts TIMESTAMP,
60                cpu DOUBLE NULL,
61                memory DOUBLE NULL,
62                disk_util DOUBLE DEFAULT 9.9,
63                TIME INDEX (ts),
64                PRIMARY KEY(host)
65            ) engine=mito"#;
66        create_table(instance, sql).await;
67
68        insert_and_query(instance).await;
69
70        drop_table(instance).await;
71    }
72
73    #[tokio::test(flavor = "multi_thread")]
74    async fn test_distributed_exec_sql() {
75        common_telemetry::init_default_ut_logging();
76
77        let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await;
78        let frontend = distributed.frontend();
79        let instance = frontend.as_ref();
80
81        let sql = r#"
82            CREATE TABLE demo(
83                host STRING,
84                ts TIMESTAMP,
85                cpu DOUBLE NULL,
86                memory DOUBLE NULL,
87                disk_util DOUBLE DEFAULT 9.9,
88                TIME INDEX (ts),
89                PRIMARY KEY(host)
90            )
91            PARTITION ON COLUMNS (host) (
92                host < '550-A',
93                host >= '550-A' AND host < '550-W',
94                host >= '550-W' AND host < 'MOSS',
95                host >= 'MOSS'
96            )
97            engine=mito"#;
98        create_table(instance, sql).await;
99
100        insert_and_query(instance).await;
101
102        verify_data_distribution(
103            &distributed,
104            HashMap::from([
105                (
106                    0u32,
107                    "\
108+---------------------+------+
109| ts                  | host |
110+---------------------+------+
111| 2013-12-31T16:00:00 | 490  |
112+---------------------+------+",
113                ),
114                (
115                    1u32,
116                    "\
117+---------------------+-------+
118| ts                  | host  |
119+---------------------+-------+
120| 2022-12-31T16:00:00 | 550-A |
121+---------------------+-------+",
122                ),
123                (
124                    2u32,
125                    "\
126+---------------------+-------+
127| ts                  | host  |
128+---------------------+-------+
129| 2023-12-31T16:00:00 | 550-W |
130+---------------------+-------+",
131                ),
132                (
133                    3u32,
134                    "\
135+---------------------+------+
136| ts                  | host |
137+---------------------+------+
138| 2043-12-31T16:00:00 | MOSS |
139+---------------------+------+",
140                ),
141            ]),
142        )
143        .await;
144
145        drop_table(instance).await;
146
147        verify_table_is_dropped(&distributed).await;
148    }
149
150    async fn query(instance: &Instance, sql: &str) -> Output {
151        SqlQueryHandler::do_query(instance, sql, QueryContext::arc())
152            .await
153            .remove(0)
154            .unwrap()
155    }
156
157    async fn create_table(instance: &Instance, sql: &str) {
158        let output = query(instance, sql).await;
159        let OutputData::AffectedRows(x) = output.data else {
160            unreachable!()
161        };
162        assert_eq!(x, 0);
163    }
164
165    async fn insert_and_query(instance: &Instance) {
166        let sql = r#"INSERT INTO demo(host, cpu, memory, ts) VALUES
167                                ('490', 0.1, 1, 1388505600000),
168                                ('550-A', 1, 100, 1672502400000),
169                                ('550-W', 10000, 1000000, 1704038400000),
170                                ('MOSS', 100000000, 10000000000, 2335190400000)
171                                "#;
172        let output = query(instance, sql).await;
173        let OutputData::AffectedRows(x) = output.data else {
174            unreachable!()
175        };
176        assert_eq!(x, 4);
177
178        let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; // use nanoseconds as where condition
179        let output = query(instance, sql).await;
180        let OutputData::Stream(s) = output.data else {
181            unreachable!()
182        };
183        let batches = common_recordbatch::util::collect_batches(s).await.unwrap();
184        let pretty_print = batches.pretty_print().unwrap();
185        let expected = "\
186+-------+---------------------+-------------+---------------+-----------+
187| host  | ts                  | cpu         | memory        | disk_util |
188+-------+---------------------+-------------+---------------+-----------+
189| 490   | 2013-12-31T16:00:00 | 0.1         | 1.0           | 9.9       |
190| 550-A | 2022-12-31T16:00:00 | 1.0         | 100.0         | 9.9       |
191| 550-W | 2023-12-31T16:00:00 | 10000.0     | 1000000.0     | 9.9       |
192| MOSS  | 2043-12-31T16:00:00 | 100000000.0 | 10000000000.0 | 9.9       |
193+-------+---------------------+-------------+---------------+-----------+";
194        assert_eq!(pretty_print, expected);
195    }
196
197    async fn verify_data_distribution(
198        instance: &MockDistributedInstance,
199        expected_distribution: HashMap<u32, &str>,
200    ) {
201        let manager = instance.table_metadata_manager();
202        let table_id = manager
203            .table_name_manager()
204            .get(TableNameKey::new(
205                DEFAULT_CATALOG_NAME,
206                DEFAULT_SCHEMA_NAME,
207                "demo",
208            ))
209            .await
210            .unwrap()
211            .unwrap()
212            .table_id();
213        debug!("Reading table {table_id}");
214
215        let table_route_value = manager
216            .table_route_manager()
217            .table_route_storage()
218            .get(table_id)
219            .await
220            .unwrap()
221            .unwrap();
222
223        let region_to_dn_map = region_distribution(
224            table_route_value
225                .region_routes()
226                .expect("region routes should be physical"),
227        )
228        .iter()
229        .map(|(k, v)| (v.leader_regions[0], *k))
230        .collect::<HashMap<u32, u64>>();
231        assert!(region_to_dn_map.len() <= instance.datanodes().len());
232
233        let stmt = QueryLanguageParser::parse_sql(
234            "SELECT ts, host FROM demo ORDER BY ts",
235            &QueryContext::arc(),
236        )
237        .unwrap();
238        let plan = instance
239            .frontend()
240            .statement_executor()
241            .plan(&stmt, QueryContext::arc())
242            .await
243            .unwrap();
244        let plan = DFLogicalSubstraitConvertor
245            .encode(&plan, DefaultSerializer)
246            .unwrap();
247
248        for (region, dn) in region_to_dn_map.iter() {
249            let region_server = instance.datanodes().get(dn).unwrap().region_server();
250
251            let region_id = RegionId::new(table_id, *region);
252
253            let stream = region_server
254                .handle_remote_read(
255                    QueryRequest {
256                        region_id: region_id.as_u64(),
257                        plan: plan.to_vec(),
258                        ..Default::default()
259                    },
260                    QueryContext::arc(),
261                )
262                .await
263                .unwrap();
264
265            let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
266            let actual = recordbatches.pretty_print().unwrap();
267
268            let expected = expected_distribution.get(region).unwrap();
269            assert_eq!(&actual, expected);
270        }
271    }
272
273    async fn drop_table(instance: &Instance) {
274        let sql = "DROP TABLE demo";
275        let output = query(instance, sql).await;
276        let OutputData::AffectedRows(x) = output.data else {
277            unreachable!()
278        };
279        assert_eq!(x, 0);
280    }
281
282    async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
283        assert!(
284            instance
285                .frontend()
286                .catalog_manager()
287                .table("greptime", "public", "demo", None)
288                .await
289                .unwrap()
290                .is_none()
291        )
292    }
293
294    #[tokio::test(flavor = "multi_thread")]
295    async fn test_sql_interceptor_plugin() {
296        #[derive(Default)]
297        struct AssertionHook {
298            pub(crate) c: AtomicU32,
299        }
300
301        impl SqlQueryInterceptor for AssertionHook {
302            type Error = Error;
303
304            fn pre_parsing<'a>(
305                &self,
306                query: &'a str,
307                _query_ctx: QueryContextRef,
308            ) -> Result<Cow<'a, str>> {
309                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
310                assert!(query.starts_with("CREATE TABLE demo"));
311                Ok(Cow::Borrowed(query))
312            }
313
314            fn post_parsing(
315                &self,
316                statements: Vec<Statement>,
317                _query_ctx: QueryContextRef,
318            ) -> Result<Vec<Statement>> {
319                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
320                assert!(matches!(statements[0], Statement::CreateTable(_)));
321                Ok(statements)
322            }
323
324            fn pre_execute(
325                &self,
326                _statement: &Statement,
327                _plan: Option<&LogicalPlan>,
328                _query_ctx: QueryContextRef,
329            ) -> Result<()> {
330                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
331                Ok(())
332            }
333
334            fn post_execute(
335                &self,
336                mut output: Output,
337                _query_ctx: QueryContextRef,
338            ) -> Result<Output> {
339                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
340                match &mut output.data {
341                    OutputData::AffectedRows(rows) => {
342                        assert_eq!(*rows, 0);
343                        // update output result
344                        *rows = 10;
345                    }
346                    _ => unreachable!(),
347                }
348                Ok(output)
349            }
350        }
351
352        let plugins = Plugins::new();
353        let counter_hook = Arc::new(AssertionHook::default());
354        plugins.insert::<SqlQueryInterceptorRef<Error>>(counter_hook.clone());
355
356        let standalone = GreptimeDbStandaloneBuilder::new("test_sql_interceptor_plugin")
357            .with_plugin(plugins)
358            .build()
359            .await;
360        let instance = standalone.fe_instance().clone();
361
362        let sql = r#"CREATE TABLE demo(
363                            host STRING,
364                            ts TIMESTAMP,
365                            cpu DOUBLE NULL,
366                            memory DOUBLE NULL,
367                            disk_util DOUBLE DEFAULT 9.9,
368                            TIME INDEX (ts),
369                            PRIMARY KEY(host)
370                        ) engine=mito;"#;
371        let output = SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc())
372            .await
373            .remove(0)
374            .unwrap();
375
376        // assert that the hook is called 3 times
377        assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed));
378        match output.data {
379            OutputData::AffectedRows(rows) => assert_eq!(rows, 10),
380            _ => unreachable!(),
381        }
382    }
383
384    #[tokio::test(flavor = "multi_thread")]
385    async fn test_exec_plan_interceptor_plugin() {
386        use std::sync::atomic::{AtomicBool, Ordering};
387
388        #[derive(Default)]
389        struct ExecPlanHook {
390            pub(crate) pre_execute_called: AtomicBool,
391            pub(crate) post_execute_called: AtomicBool,
392        }
393
394        impl SqlQueryInterceptor for ExecPlanHook {
395            type Error = Error;
396
397            fn pre_execute(
398                &self,
399                _statement: &Statement,
400                _plan: Option<&LogicalPlan>,
401                _query_ctx: QueryContextRef,
402            ) -> Result<()> {
403                self.pre_execute_called.store(true, Ordering::Relaxed);
404                Ok(())
405            }
406
407            fn post_execute(&self, output: Output, _query_ctx: QueryContextRef) -> Result<Output> {
408                self.post_execute_called.store(true, Ordering::Relaxed);
409                Ok(output)
410            }
411        }
412
413        let plugins = Plugins::new();
414        let hook = Arc::new(ExecPlanHook::default());
415        plugins.insert::<SqlQueryInterceptorRef<Error>>(hook.clone());
416
417        let standalone = GreptimeDbStandaloneBuilder::new("test_exec_plan_interceptor_plugin")
418            .with_plugin(plugins)
419            .build()
420            .await;
421        let instance = standalone.fe_instance().clone();
422
423        let sql = r#"CREATE TABLE demo(
424                            host STRING,
425                            ts TIMESTAMP,
426                            cpu DOUBLE NULL,
427                            TIME INDEX (ts),
428                            PRIMARY KEY(host)
429                        ) engine=mito;"#;
430        SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc())
431            .await
432            .remove(0)
433            .unwrap();
434
435        let query_ctx = QueryContext::arc();
436        let stmt = QueryLanguageParser::parse_sql("SELECT * FROM demo", &query_ctx).unwrap();
437        let plan = instance
438            .statement_executor()
439            .plan(&stmt, query_ctx.clone())
440            .await
441            .unwrap();
442        let QueryStatement::Sql(sql_stmt) = stmt else {
443            unreachable!()
444        };
445
446        SqlQueryHandler::do_exec_plan(&*instance, plan, Some(sql_stmt), query_ctx.clone())
447            .await
448            .unwrap();
449
450        assert!(
451            hook.pre_execute_called.load(Ordering::Relaxed),
452            "pre_execute should be called for do_exec_plan"
453        );
454        assert!(
455            hook.post_execute_called.load(Ordering::Relaxed),
456            "post_execute should be called for do_exec_plan"
457        );
458    }
459
460    #[tokio::test(flavor = "multi_thread")]
461    async fn test_disable_db_operation_plugin() {
462        #[derive(Default)]
463        struct DisableDBOpHook;
464
465        impl SqlQueryInterceptor for DisableDBOpHook {
466            type Error = Error;
467
468            fn post_parsing(
469                &self,
470                statements: Vec<Statement>,
471                _query_ctx: QueryContextRef,
472            ) -> Result<Vec<Statement>> {
473                for s in &statements {
474                    match s {
475                        Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => {
476                            return Err(Error::NotSupported {
477                                feat: "Database operations".to_owned(),
478                            });
479                        }
480                        _ => {}
481                    }
482                }
483
484                Ok(statements)
485            }
486        }
487
488        let query_ctx = QueryContext::arc();
489
490        let plugins = Plugins::new();
491        let hook = Arc::new(DisableDBOpHook);
492        plugins.insert::<SqlQueryInterceptorRef<Error>>(hook.clone());
493
494        let standalone = GreptimeDbStandaloneBuilder::new("test_disable_db_operation_plugin")
495            .with_plugin(plugins)
496            .build()
497            .await;
498        let instance = standalone.fe_instance().clone();
499
500        let sql = r#"CREATE TABLE demo(
501                            host STRING,
502                            ts TIMESTAMP,
503                            cpu DOUBLE NULL,
504                            memory DOUBLE NULL,
505                            disk_util DOUBLE DEFAULT 9.9,
506                            TIME INDEX (ts),
507                            PRIMARY KEY(host)
508                        ) engine=mito;"#;
509        let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
510            .await
511            .remove(0)
512            .unwrap();
513
514        match output.data {
515            OutputData::AffectedRows(rows) => assert_eq!(rows, 0),
516            _ => unreachable!(),
517        }
518
519        let sql = r#"CREATE DATABASE tomcat"#;
520        if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
521            .await
522            .remove(0)
523        {
524            assert_eq!(e.status_code(), StatusCode::Unsupported);
525        } else {
526            unreachable!();
527        }
528
529        let sql = r#"SELECT 1; SHOW DATABASES"#;
530        if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
531            .await
532            .remove(0)
533        {
534            assert_eq!(e.status_code(), StatusCode::Unsupported);
535        } else {
536            unreachable!();
537        }
538    }
539}