tests_integration/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests {
17    use std::borrow::Cow;
18    use std::collections::HashMap;
19    use std::sync::Arc;
20    use std::sync::atomic::AtomicU32;
21
22    use api::v1::region::QueryRequest;
23    use client::OutputData;
24    use common_base::Plugins;
25    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
26    use common_meta::key::table_name::TableNameKey;
27    use common_meta::rpc::router::region_distribution;
28    use common_query::Output;
29    use common_recordbatch::RecordBatches;
30    use common_telemetry::debug;
31    use datafusion_expr::LogicalPlan;
32    use frontend::error::{self, Error, Result};
33    use frontend::instance::Instance;
34    use query::parser::QueryLanguageParser;
35    use query::query_engine::DefaultSerializer;
36    use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
37    use servers::query_handler::sql::SqlQueryHandler;
38    use session::context::{QueryContext, QueryContextRef};
39    use sql::statements::statement::Statement;
40    use store_api::storage::RegionId;
41    use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
42
43    use crate::standalone::GreptimeDbStandaloneBuilder;
44    use crate::tests;
45    use crate::tests::MockDistributedInstance;
46
47    #[tokio::test(flavor = "multi_thread")]
48    async fn test_standalone_exec_sql() {
49        let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_exec_sql")
50            .build()
51            .await;
52        let instance = standalone.fe_instance();
53
54        let sql = r#"
55            CREATE TABLE demo(
56                host STRING,
57                ts TIMESTAMP,
58                cpu DOUBLE NULL,
59                memory DOUBLE NULL,
60                disk_util DOUBLE DEFAULT 9.9,
61                TIME INDEX (ts),
62                PRIMARY KEY(host)
63            ) engine=mito"#;
64        create_table(instance, sql).await;
65
66        insert_and_query(instance).await;
67
68        drop_table(instance).await;
69    }
70
71    #[tokio::test(flavor = "multi_thread")]
72    async fn test_distributed_exec_sql() {
73        common_telemetry::init_default_ut_logging();
74
75        let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await;
76        let frontend = distributed.frontend();
77        let instance = frontend.as_ref();
78
79        let sql = r#"
80            CREATE TABLE demo(
81                host STRING,
82                ts TIMESTAMP,
83                cpu DOUBLE NULL,
84                memory DOUBLE NULL,
85                disk_util DOUBLE DEFAULT 9.9,
86                TIME INDEX (ts),
87                PRIMARY KEY(host)
88            )
89            PARTITION ON COLUMNS (host) (
90                host < '550-A',
91                host >= '550-A' AND host < '550-W',
92                host >= '550-W' AND host < 'MOSS',
93                host >= 'MOSS'
94            )
95            engine=mito"#;
96        create_table(instance, sql).await;
97
98        insert_and_query(instance).await;
99
100        verify_data_distribution(
101            &distributed,
102            HashMap::from([
103                (
104                    0u32,
105                    "\
106+---------------------+------+
107| ts                  | host |
108+---------------------+------+
109| 2013-12-31T16:00:00 | 490  |
110+---------------------+------+",
111                ),
112                (
113                    1u32,
114                    "\
115+---------------------+-------+
116| ts                  | host  |
117+---------------------+-------+
118| 2022-12-31T16:00:00 | 550-A |
119+---------------------+-------+",
120                ),
121                (
122                    2u32,
123                    "\
124+---------------------+-------+
125| ts                  | host  |
126+---------------------+-------+
127| 2023-12-31T16:00:00 | 550-W |
128+---------------------+-------+",
129                ),
130                (
131                    3u32,
132                    "\
133+---------------------+------+
134| ts                  | host |
135+---------------------+------+
136| 2043-12-31T16:00:00 | MOSS |
137+---------------------+------+",
138                ),
139            ]),
140        )
141        .await;
142
143        drop_table(instance).await;
144
145        verify_table_is_dropped(&distributed).await;
146    }
147
148    async fn query(instance: &Instance, sql: &str) -> Output {
149        SqlQueryHandler::do_query(instance, sql, QueryContext::arc())
150            .await
151            .remove(0)
152            .unwrap()
153    }
154
155    async fn create_table(instance: &Instance, sql: &str) {
156        let output = query(instance, sql).await;
157        let OutputData::AffectedRows(x) = output.data else {
158            unreachable!()
159        };
160        assert_eq!(x, 0);
161    }
162
163    async fn insert_and_query(instance: &Instance) {
164        let sql = r#"INSERT INTO demo(host, cpu, memory, ts) VALUES
165                                ('490', 0.1, 1, 1388505600000),
166                                ('550-A', 1, 100, 1672502400000),
167                                ('550-W', 10000, 1000000, 1704038400000),
168                                ('MOSS', 100000000, 10000000000, 2335190400000)
169                                "#;
170        let output = query(instance, sql).await;
171        let OutputData::AffectedRows(x) = output.data else {
172            unreachable!()
173        };
174        assert_eq!(x, 4);
175
176        let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; // use nanoseconds as where condition
177        let output = query(instance, sql).await;
178        let OutputData::Stream(s) = output.data else {
179            unreachable!()
180        };
181        let batches = common_recordbatch::util::collect_batches(s).await.unwrap();
182        let pretty_print = batches.pretty_print().unwrap();
183        let expected = "\
184+-------+---------------------+-------------+---------------+-----------+
185| host  | ts                  | cpu         | memory        | disk_util |
186+-------+---------------------+-------------+---------------+-----------+
187| 490   | 2013-12-31T16:00:00 | 0.1         | 1.0           | 9.9       |
188| 550-A | 2022-12-31T16:00:00 | 1.0         | 100.0         | 9.9       |
189| 550-W | 2023-12-31T16:00:00 | 10000.0     | 1000000.0     | 9.9       |
190| MOSS  | 2043-12-31T16:00:00 | 100000000.0 | 10000000000.0 | 9.9       |
191+-------+---------------------+-------------+---------------+-----------+";
192        assert_eq!(pretty_print, expected);
193    }
194
195    async fn verify_data_distribution(
196        instance: &MockDistributedInstance,
197        expected_distribution: HashMap<u32, &str>,
198    ) {
199        let manager = instance.table_metadata_manager();
200        let table_id = manager
201            .table_name_manager()
202            .get(TableNameKey::new(
203                DEFAULT_CATALOG_NAME,
204                DEFAULT_SCHEMA_NAME,
205                "demo",
206            ))
207            .await
208            .unwrap()
209            .unwrap()
210            .table_id();
211        debug!("Reading table {table_id}");
212
213        let table_route_value = manager
214            .table_route_manager()
215            .table_route_storage()
216            .get(table_id)
217            .await
218            .unwrap()
219            .unwrap();
220
221        let region_to_dn_map = region_distribution(
222            table_route_value
223                .region_routes()
224                .expect("region routes should be physical"),
225        )
226        .iter()
227        .map(|(k, v)| (v.leader_regions[0], *k))
228        .collect::<HashMap<u32, u64>>();
229        assert!(region_to_dn_map.len() <= instance.datanodes().len());
230
231        let stmt = QueryLanguageParser::parse_sql(
232            "SELECT ts, host FROM demo ORDER BY ts",
233            &QueryContext::arc(),
234        )
235        .unwrap();
236        let plan = instance
237            .frontend()
238            .statement_executor()
239            .plan(&stmt, QueryContext::arc())
240            .await
241            .unwrap();
242        let plan = DFLogicalSubstraitConvertor
243            .encode(&plan, DefaultSerializer)
244            .unwrap();
245
246        for (region, dn) in region_to_dn_map.iter() {
247            let region_server = instance.datanodes().get(dn).unwrap().region_server();
248
249            let region_id = RegionId::new(table_id, *region);
250
251            let stream = region_server
252                .handle_remote_read(
253                    QueryRequest {
254                        region_id: region_id.as_u64(),
255                        plan: plan.to_vec(),
256                        ..Default::default()
257                    },
258                    QueryContext::arc(),
259                )
260                .await
261                .unwrap();
262
263            let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
264            let actual = recordbatches.pretty_print().unwrap();
265
266            let expected = expected_distribution.get(region).unwrap();
267            assert_eq!(&actual, expected);
268        }
269    }
270
271    async fn drop_table(instance: &Instance) {
272        let sql = "DROP TABLE demo";
273        let output = query(instance, sql).await;
274        let OutputData::AffectedRows(x) = output.data else {
275            unreachable!()
276        };
277        assert_eq!(x, 0);
278    }
279
280    async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
281        assert!(
282            instance
283                .frontend()
284                .catalog_manager()
285                .table("greptime", "public", "demo", None)
286                .await
287                .unwrap()
288                .is_none()
289        )
290    }
291
292    #[tokio::test(flavor = "multi_thread")]
293    async fn test_sql_interceptor_plugin() {
294        #[derive(Default)]
295        struct AssertionHook {
296            pub(crate) c: AtomicU32,
297        }
298
299        impl SqlQueryInterceptor for AssertionHook {
300            type Error = Error;
301
302            fn pre_parsing<'a>(
303                &self,
304                query: &'a str,
305                _query_ctx: QueryContextRef,
306            ) -> Result<Cow<'a, str>> {
307                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
308                assert!(query.starts_with("CREATE TABLE demo"));
309                Ok(Cow::Borrowed(query))
310            }
311
312            fn post_parsing(
313                &self,
314                statements: Vec<Statement>,
315                _query_ctx: QueryContextRef,
316            ) -> Result<Vec<Statement>> {
317                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
318                assert!(matches!(statements[0], Statement::CreateTable(_)));
319                Ok(statements)
320            }
321
322            fn pre_execute(
323                &self,
324                _statement: &Statement,
325                _plan: Option<&LogicalPlan>,
326                _query_ctx: QueryContextRef,
327            ) -> Result<()> {
328                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
329                Ok(())
330            }
331
332            fn post_execute(
333                &self,
334                mut output: Output,
335                _query_ctx: QueryContextRef,
336            ) -> Result<Output> {
337                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
338                match &mut output.data {
339                    OutputData::AffectedRows(rows) => {
340                        assert_eq!(*rows, 0);
341                        // update output result
342                        *rows = 10;
343                    }
344                    _ => unreachable!(),
345                }
346                Ok(output)
347            }
348        }
349
350        let plugins = Plugins::new();
351        let counter_hook = Arc::new(AssertionHook::default());
352        plugins.insert::<SqlQueryInterceptorRef<Error>>(counter_hook.clone());
353
354        let standalone = GreptimeDbStandaloneBuilder::new("test_sql_interceptor_plugin")
355            .with_plugin(plugins)
356            .build()
357            .await;
358        let instance = standalone.fe_instance().clone();
359
360        let sql = r#"CREATE TABLE demo(
361                            host STRING,
362                            ts TIMESTAMP,
363                            cpu DOUBLE NULL,
364                            memory DOUBLE NULL,
365                            disk_util DOUBLE DEFAULT 9.9,
366                            TIME INDEX (ts),
367                            PRIMARY KEY(host)
368                        ) engine=mito;"#;
369        let output = SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc())
370            .await
371            .remove(0)
372            .unwrap();
373
374        // assert that the hook is called 3 times
375        assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed));
376        match output.data {
377            OutputData::AffectedRows(rows) => assert_eq!(rows, 10),
378            _ => unreachable!(),
379        }
380    }
381
382    #[tokio::test(flavor = "multi_thread")]
383    async fn test_disable_db_operation_plugin() {
384        #[derive(Default)]
385        struct DisableDBOpHook;
386
387        impl SqlQueryInterceptor for DisableDBOpHook {
388            type Error = Error;
389
390            fn post_parsing(
391                &self,
392                statements: Vec<Statement>,
393                _query_ctx: QueryContextRef,
394            ) -> Result<Vec<Statement>> {
395                for s in &statements {
396                    match s {
397                        Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => {
398                            return Err(Error::NotSupported {
399                                feat: "Database operations".to_owned(),
400                            });
401                        }
402                        _ => {}
403                    }
404                }
405
406                Ok(statements)
407            }
408        }
409
410        let query_ctx = QueryContext::arc();
411
412        let plugins = Plugins::new();
413        let hook = Arc::new(DisableDBOpHook);
414        plugins.insert::<SqlQueryInterceptorRef<Error>>(hook.clone());
415
416        let standalone = GreptimeDbStandaloneBuilder::new("test_disable_db_operation_plugin")
417            .with_plugin(plugins)
418            .build()
419            .await;
420        let instance = standalone.fe_instance().clone();
421
422        let sql = r#"CREATE TABLE demo(
423                            host STRING,
424                            ts TIMESTAMP,
425                            cpu DOUBLE NULL,
426                            memory DOUBLE NULL,
427                            disk_util DOUBLE DEFAULT 9.9,
428                            TIME INDEX (ts),
429                            PRIMARY KEY(host)
430                        ) engine=mito;"#;
431        let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
432            .await
433            .remove(0)
434            .unwrap();
435
436        match output.data {
437            OutputData::AffectedRows(rows) => assert_eq!(rows, 0),
438            _ => unreachable!(),
439        }
440
441        let sql = r#"CREATE DATABASE tomcat"#;
442        if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
443            .await
444            .remove(0)
445        {
446            assert!(matches!(e, error::Error::NotSupported { .. }));
447        } else {
448            unreachable!();
449        }
450
451        let sql = r#"SELECT 1; SHOW DATABASES"#;
452        if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
453            .await
454            .remove(0)
455        {
456            assert!(matches!(e, error::Error::NotSupported { .. }));
457        } else {
458            unreachable!();
459        }
460    }
461}