tests_integration/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests {
17    use std::borrow::Cow;
18    use std::collections::HashMap;
19    use std::sync::atomic::AtomicU32;
20    use std::sync::Arc;
21
22    use api::v1::region::QueryRequest;
23    use client::OutputData;
24    use common_base::Plugins;
25    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
26    use common_meta::key::table_name::TableNameKey;
27    use common_meta::rpc::router::region_distribution;
28    use common_query::Output;
29    use common_recordbatch::RecordBatches;
30    use common_telemetry::debug;
31    use datafusion_expr::LogicalPlan;
32    use frontend::error::{self, Error, Result};
33    use frontend::instance::Instance;
34    use query::parser::QueryLanguageParser;
35    use query::query_engine::DefaultSerializer;
36    use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
37    use servers::query_handler::sql::SqlQueryHandler;
38    use session::context::{QueryContext, QueryContextRef};
39    use sql::statements::statement::Statement;
40    use store_api::storage::RegionId;
41    use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
42
43    use crate::standalone::GreptimeDbStandaloneBuilder;
44    use crate::tests;
45    use crate::tests::MockDistributedInstance;
46
47    #[tokio::test(flavor = "multi_thread")]
48    async fn test_standalone_exec_sql() {
49        let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_exec_sql")
50            .build()
51            .await;
52        let instance = standalone.fe_instance();
53
54        let sql = r#"
55            CREATE TABLE demo(
56                host STRING,
57                ts TIMESTAMP,
58                cpu DOUBLE NULL,
59                memory DOUBLE NULL,
60                disk_util DOUBLE DEFAULT 9.9,
61                TIME INDEX (ts),
62                PRIMARY KEY(host)
63            ) engine=mito"#;
64        create_table(instance, sql).await;
65
66        insert_and_query(instance).await;
67
68        drop_table(instance).await;
69    }
70
71    #[tokio::test(flavor = "multi_thread")]
72    async fn test_distributed_exec_sql() {
73        common_telemetry::init_default_ut_logging();
74
75        let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await;
76        let frontend = distributed.frontend();
77        let instance = frontend.as_ref();
78
79        let sql = r#"
80            CREATE TABLE demo(
81                host STRING,
82                ts TIMESTAMP,
83                cpu DOUBLE NULL,
84                memory DOUBLE NULL,
85                disk_util DOUBLE DEFAULT 9.9,
86                TIME INDEX (ts),
87                PRIMARY KEY(host)
88            )
89            PARTITION ON COLUMNS (host) (
90                host < '550-A',
91                host >= '550-A' AND host < '550-W',
92                host >= '550-W' AND host < 'MOSS',
93                host >= 'MOSS'
94            )
95            engine=mito"#;
96        create_table(instance, sql).await;
97
98        insert_and_query(instance).await;
99
100        verify_data_distribution(
101            &distributed,
102            HashMap::from([
103                (
104                    0u32,
105                    "\
106+---------------------+------+
107| ts                  | host |
108+---------------------+------+
109| 2013-12-31T16:00:00 | 490  |
110+---------------------+------+",
111                ),
112                (
113                    1u32,
114                    "\
115+---------------------+-------+
116| ts                  | host  |
117+---------------------+-------+
118| 2022-12-31T16:00:00 | 550-A |
119+---------------------+-------+",
120                ),
121                (
122                    2u32,
123                    "\
124+---------------------+-------+
125| ts                  | host  |
126+---------------------+-------+
127| 2023-12-31T16:00:00 | 550-W |
128+---------------------+-------+",
129                ),
130                (
131                    3u32,
132                    "\
133+---------------------+------+
134| ts                  | host |
135+---------------------+------+
136| 2043-12-31T16:00:00 | MOSS |
137+---------------------+------+",
138                ),
139            ]),
140        )
141        .await;
142
143        drop_table(instance).await;
144
145        verify_table_is_dropped(&distributed).await;
146    }
147
148    async fn query(instance: &Instance, sql: &str) -> Output {
149        SqlQueryHandler::do_query(instance, sql, QueryContext::arc())
150            .await
151            .remove(0)
152            .unwrap()
153    }
154
155    async fn create_table(instance: &Instance, sql: &str) {
156        let output = query(instance, sql).await;
157        let OutputData::AffectedRows(x) = output.data else {
158            unreachable!()
159        };
160        assert_eq!(x, 0);
161    }
162
163    async fn insert_and_query(instance: &Instance) {
164        let sql = r#"INSERT INTO demo(host, cpu, memory, ts) VALUES
165                                ('490', 0.1, 1, 1388505600000),
166                                ('550-A', 1, 100, 1672502400000),
167                                ('550-W', 10000, 1000000, 1704038400000),
168                                ('MOSS', 100000000, 10000000000, 2335190400000)
169                                "#;
170        let output = query(instance, sql).await;
171        let OutputData::AffectedRows(x) = output.data else {
172            unreachable!()
173        };
174        assert_eq!(x, 4);
175
176        let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; // use nanoseconds as where condition
177        let output = query(instance, sql).await;
178        let OutputData::Stream(s) = output.data else {
179            unreachable!()
180        };
181        let batches = common_recordbatch::util::collect_batches(s).await.unwrap();
182        let pretty_print = batches.pretty_print().unwrap();
183        let expected = "\
184+-------+---------------------+-------------+---------------+-----------+
185| host  | ts                  | cpu         | memory        | disk_util |
186+-------+---------------------+-------------+---------------+-----------+
187| 490   | 2013-12-31T16:00:00 | 0.1         | 1.0           | 9.9       |
188| 550-A | 2022-12-31T16:00:00 | 1.0         | 100.0         | 9.9       |
189| 550-W | 2023-12-31T16:00:00 | 10000.0     | 1000000.0     | 9.9       |
190| MOSS  | 2043-12-31T16:00:00 | 100000000.0 | 10000000000.0 | 9.9       |
191+-------+---------------------+-------------+---------------+-----------+";
192        assert_eq!(pretty_print, expected);
193    }
194
195    async fn verify_data_distribution(
196        instance: &MockDistributedInstance,
197        expected_distribution: HashMap<u32, &str>,
198    ) {
199        let manager = instance.table_metadata_manager();
200        let table_id = manager
201            .table_name_manager()
202            .get(TableNameKey::new(
203                DEFAULT_CATALOG_NAME,
204                DEFAULT_SCHEMA_NAME,
205                "demo",
206            ))
207            .await
208            .unwrap()
209            .unwrap()
210            .table_id();
211        debug!("Reading table {table_id}");
212
213        let table_route_value = manager
214            .table_route_manager()
215            .table_route_storage()
216            .get(table_id)
217            .await
218            .unwrap()
219            .unwrap();
220
221        let region_to_dn_map = region_distribution(
222            table_route_value
223                .region_routes()
224                .expect("region routes should be physical"),
225        )
226        .iter()
227        .map(|(k, v)| (v.leader_regions[0], *k))
228        .collect::<HashMap<u32, u64>>();
229        assert!(region_to_dn_map.len() <= instance.datanodes().len());
230
231        let stmt = QueryLanguageParser::parse_sql(
232            "SELECT ts, host FROM demo ORDER BY ts",
233            &QueryContext::arc(),
234        )
235        .unwrap();
236        let plan = instance
237            .frontend()
238            .statement_executor()
239            .plan(&stmt, QueryContext::arc())
240            .await
241            .unwrap();
242        let plan = DFLogicalSubstraitConvertor
243            .encode(&plan, DefaultSerializer)
244            .unwrap();
245
246        for (region, dn) in region_to_dn_map.iter() {
247            let region_server = instance.datanodes().get(dn).unwrap().region_server();
248
249            let region_id = RegionId::new(table_id, *region);
250
251            let stream = region_server
252                .handle_remote_read(QueryRequest {
253                    region_id: region_id.as_u64(),
254                    plan: plan.to_vec(),
255                    ..Default::default()
256                })
257                .await
258                .unwrap();
259
260            let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
261            let actual = recordbatches.pretty_print().unwrap();
262
263            let expected = expected_distribution.get(region).unwrap();
264            assert_eq!(&actual, expected);
265        }
266    }
267
268    async fn drop_table(instance: &Instance) {
269        let sql = "DROP TABLE demo";
270        let output = query(instance, sql).await;
271        let OutputData::AffectedRows(x) = output.data else {
272            unreachable!()
273        };
274        assert_eq!(x, 0);
275    }
276
277    async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
278        assert!(instance
279            .frontend()
280            .catalog_manager()
281            .table("greptime", "public", "demo", None)
282            .await
283            .unwrap()
284            .is_none())
285    }
286
287    #[tokio::test(flavor = "multi_thread")]
288    async fn test_sql_interceptor_plugin() {
289        #[derive(Default)]
290        struct AssertionHook {
291            pub(crate) c: AtomicU32,
292        }
293
294        impl SqlQueryInterceptor for AssertionHook {
295            type Error = Error;
296
297            fn pre_parsing<'a>(
298                &self,
299                query: &'a str,
300                _query_ctx: QueryContextRef,
301            ) -> Result<Cow<'a, str>> {
302                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
303                assert!(query.starts_with("CREATE TABLE demo"));
304                Ok(Cow::Borrowed(query))
305            }
306
307            fn post_parsing(
308                &self,
309                statements: Vec<Statement>,
310                _query_ctx: QueryContextRef,
311            ) -> Result<Vec<Statement>> {
312                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
313                assert!(matches!(statements[0], Statement::CreateTable(_)));
314                Ok(statements)
315            }
316
317            fn pre_execute(
318                &self,
319                _statement: &Statement,
320                _plan: Option<&LogicalPlan>,
321                _query_ctx: QueryContextRef,
322            ) -> Result<()> {
323                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
324                Ok(())
325            }
326
327            fn post_execute(
328                &self,
329                mut output: Output,
330                _query_ctx: QueryContextRef,
331            ) -> Result<Output> {
332                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
333                match &mut output.data {
334                    OutputData::AffectedRows(rows) => {
335                        assert_eq!(*rows, 0);
336                        // update output result
337                        *rows = 10;
338                    }
339                    _ => unreachable!(),
340                }
341                Ok(output)
342            }
343        }
344
345        let plugins = Plugins::new();
346        let counter_hook = Arc::new(AssertionHook::default());
347        plugins.insert::<SqlQueryInterceptorRef<Error>>(counter_hook.clone());
348
349        let standalone = GreptimeDbStandaloneBuilder::new("test_sql_interceptor_plugin")
350            .with_plugin(plugins)
351            .build()
352            .await;
353        let instance = standalone.fe_instance().clone();
354
355        let sql = r#"CREATE TABLE demo(
356                            host STRING,
357                            ts TIMESTAMP,
358                            cpu DOUBLE NULL,
359                            memory DOUBLE NULL,
360                            disk_util DOUBLE DEFAULT 9.9,
361                            TIME INDEX (ts),
362                            PRIMARY KEY(host)
363                        ) engine=mito;"#;
364        let output = SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc())
365            .await
366            .remove(0)
367            .unwrap();
368
369        // assert that the hook is called 3 times
370        assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed));
371        match output.data {
372            OutputData::AffectedRows(rows) => assert_eq!(rows, 10),
373            _ => unreachable!(),
374        }
375    }
376
377    #[tokio::test(flavor = "multi_thread")]
378    async fn test_disable_db_operation_plugin() {
379        #[derive(Default)]
380        struct DisableDBOpHook;
381
382        impl SqlQueryInterceptor for DisableDBOpHook {
383            type Error = Error;
384
385            fn post_parsing(
386                &self,
387                statements: Vec<Statement>,
388                _query_ctx: QueryContextRef,
389            ) -> Result<Vec<Statement>> {
390                for s in &statements {
391                    match s {
392                        Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => {
393                            return Err(Error::NotSupported {
394                                feat: "Database operations".to_owned(),
395                            })
396                        }
397                        _ => {}
398                    }
399                }
400
401                Ok(statements)
402            }
403        }
404
405        let query_ctx = QueryContext::arc();
406
407        let plugins = Plugins::new();
408        let hook = Arc::new(DisableDBOpHook);
409        plugins.insert::<SqlQueryInterceptorRef<Error>>(hook.clone());
410
411        let standalone = GreptimeDbStandaloneBuilder::new("test_disable_db_operation_plugin")
412            .with_plugin(plugins)
413            .build()
414            .await;
415        let instance = standalone.fe_instance().clone();
416
417        let sql = r#"CREATE TABLE demo(
418                            host STRING,
419                            ts TIMESTAMP,
420                            cpu DOUBLE NULL,
421                            memory DOUBLE NULL,
422                            disk_util DOUBLE DEFAULT 9.9,
423                            TIME INDEX (ts),
424                            PRIMARY KEY(host)
425                        ) engine=mito;"#;
426        let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
427            .await
428            .remove(0)
429            .unwrap();
430
431        match output.data {
432            OutputData::AffectedRows(rows) => assert_eq!(rows, 0),
433            _ => unreachable!(),
434        }
435
436        let sql = r#"CREATE DATABASE tomcat"#;
437        if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
438            .await
439            .remove(0)
440        {
441            assert!(matches!(e, error::Error::NotSupported { .. }));
442        } else {
443            unreachable!();
444        }
445
446        let sql = r#"SELECT 1; SHOW DATABASES"#;
447        if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
448            .await
449            .remove(0)
450        {
451            assert!(matches!(e, error::Error::NotSupported { .. }));
452        } else {
453            unreachable!();
454        }
455    }
456}