tests_integration/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests {
17    use std::borrow::Cow;
18    use std::collections::HashMap;
19    use std::sync::Arc;
20    use std::sync::atomic::AtomicU32;
21
22    use api::v1::region::QueryRequest;
23    use client::OutputData;
24    use common_base::Plugins;
25    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
26    use common_error::ext::ErrorExt;
27    use common_error::status_code::StatusCode;
28    use common_meta::key::table_name::TableNameKey;
29    use common_meta::rpc::router::region_distribution;
30    use common_query::Output;
31    use common_recordbatch::RecordBatches;
32    use common_telemetry::debug;
33    use datafusion_expr::LogicalPlan;
34    use frontend::error::{Error, Result};
35    use frontend::instance::Instance;
36    use query::parser::QueryLanguageParser;
37    use query::query_engine::DefaultSerializer;
38    use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
39    use servers::query_handler::sql::SqlQueryHandler;
40    use session::context::{QueryContext, QueryContextRef};
41    use sql::statements::statement::Statement;
42    use store_api::storage::RegionId;
43    use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
44
45    use crate::standalone::GreptimeDbStandaloneBuilder;
46    use crate::tests;
47    use crate::tests::MockDistributedInstance;
48
49    #[tokio::test(flavor = "multi_thread")]
50    async fn test_standalone_exec_sql() {
51        let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_exec_sql")
52            .build()
53            .await;
54        let instance = standalone.fe_instance();
55
56        let sql = r#"
57            CREATE TABLE demo(
58                host STRING,
59                ts TIMESTAMP,
60                cpu DOUBLE NULL,
61                memory DOUBLE NULL,
62                disk_util DOUBLE DEFAULT 9.9,
63                TIME INDEX (ts),
64                PRIMARY KEY(host)
65            ) engine=mito"#;
66        create_table(instance, sql).await;
67
68        insert_and_query(instance).await;
69
70        drop_table(instance).await;
71    }
72
73    #[tokio::test(flavor = "multi_thread")]
74    async fn test_distributed_exec_sql() {
75        common_telemetry::init_default_ut_logging();
76
77        let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await;
78        let frontend = distributed.frontend();
79        let instance = frontend.as_ref();
80
81        let sql = r#"
82            CREATE TABLE demo(
83                host STRING,
84                ts TIMESTAMP,
85                cpu DOUBLE NULL,
86                memory DOUBLE NULL,
87                disk_util DOUBLE DEFAULT 9.9,
88                TIME INDEX (ts),
89                PRIMARY KEY(host)
90            )
91            PARTITION ON COLUMNS (host) (
92                host < '550-A',
93                host >= '550-A' AND host < '550-W',
94                host >= '550-W' AND host < 'MOSS',
95                host >= 'MOSS'
96            )
97            engine=mito"#;
98        create_table(instance, sql).await;
99
100        insert_and_query(instance).await;
101
102        verify_data_distribution(
103            &distributed,
104            HashMap::from([
105                (
106                    0u32,
107                    "\
108+---------------------+------+
109| ts                  | host |
110+---------------------+------+
111| 2013-12-31T16:00:00 | 490  |
112+---------------------+------+",
113                ),
114                (
115                    1u32,
116                    "\
117+---------------------+-------+
118| ts                  | host  |
119+---------------------+-------+
120| 2022-12-31T16:00:00 | 550-A |
121+---------------------+-------+",
122                ),
123                (
124                    2u32,
125                    "\
126+---------------------+-------+
127| ts                  | host  |
128+---------------------+-------+
129| 2023-12-31T16:00:00 | 550-W |
130+---------------------+-------+",
131                ),
132                (
133                    3u32,
134                    "\
135+---------------------+------+
136| ts                  | host |
137+---------------------+------+
138| 2043-12-31T16:00:00 | MOSS |
139+---------------------+------+",
140                ),
141            ]),
142        )
143        .await;
144
145        drop_table(instance).await;
146
147        verify_table_is_dropped(&distributed).await;
148    }
149
150    async fn query(instance: &Instance, sql: &str) -> Output {
151        SqlQueryHandler::do_query(instance, sql, QueryContext::arc())
152            .await
153            .remove(0)
154            .unwrap()
155    }
156
157    async fn create_table(instance: &Instance, sql: &str) {
158        let output = query(instance, sql).await;
159        let OutputData::AffectedRows(x) = output.data else {
160            unreachable!()
161        };
162        assert_eq!(x, 0);
163    }
164
165    async fn insert_and_query(instance: &Instance) {
166        let sql = r#"INSERT INTO demo(host, cpu, memory, ts) VALUES
167                                ('490', 0.1, 1, 1388505600000),
168                                ('550-A', 1, 100, 1672502400000),
169                                ('550-W', 10000, 1000000, 1704038400000),
170                                ('MOSS', 100000000, 10000000000, 2335190400000)
171                                "#;
172        let output = query(instance, sql).await;
173        let OutputData::AffectedRows(x) = output.data else {
174            unreachable!()
175        };
176        assert_eq!(x, 4);
177
178        let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; // use nanoseconds as where condition
179        let output = query(instance, sql).await;
180        let OutputData::Stream(s) = output.data else {
181            unreachable!()
182        };
183        let batches = common_recordbatch::util::collect_batches(s).await.unwrap();
184        let pretty_print = batches.pretty_print().unwrap();
185        let expected = "\
186+-------+---------------------+-------------+---------------+-----------+
187| host  | ts                  | cpu         | memory        | disk_util |
188+-------+---------------------+-------------+---------------+-----------+
189| 490   | 2013-12-31T16:00:00 | 0.1         | 1.0           | 9.9       |
190| 550-A | 2022-12-31T16:00:00 | 1.0         | 100.0         | 9.9       |
191| 550-W | 2023-12-31T16:00:00 | 10000.0     | 1000000.0     | 9.9       |
192| MOSS  | 2043-12-31T16:00:00 | 100000000.0 | 10000000000.0 | 9.9       |
193+-------+---------------------+-------------+---------------+-----------+";
194        assert_eq!(pretty_print, expected);
195    }
196
197    async fn verify_data_distribution(
198        instance: &MockDistributedInstance,
199        expected_distribution: HashMap<u32, &str>,
200    ) {
201        let manager = instance.table_metadata_manager();
202        let table_id = manager
203            .table_name_manager()
204            .get(TableNameKey::new(
205                DEFAULT_CATALOG_NAME,
206                DEFAULT_SCHEMA_NAME,
207                "demo",
208            ))
209            .await
210            .unwrap()
211            .unwrap()
212            .table_id();
213        debug!("Reading table {table_id}");
214
215        let table_route_value = manager
216            .table_route_manager()
217            .table_route_storage()
218            .get(table_id)
219            .await
220            .unwrap()
221            .unwrap();
222
223        let region_to_dn_map = region_distribution(
224            table_route_value
225                .region_routes()
226                .expect("region routes should be physical"),
227        )
228        .iter()
229        .map(|(k, v)| (v.leader_regions[0], *k))
230        .collect::<HashMap<u32, u64>>();
231        assert!(region_to_dn_map.len() <= instance.datanodes().len());
232
233        let stmt = QueryLanguageParser::parse_sql(
234            "SELECT ts, host FROM demo ORDER BY ts",
235            &QueryContext::arc(),
236        )
237        .unwrap();
238        let plan = instance
239            .frontend()
240            .statement_executor()
241            .plan(&stmt, QueryContext::arc())
242            .await
243            .unwrap();
244        let plan = DFLogicalSubstraitConvertor
245            .encode(&plan, DefaultSerializer)
246            .unwrap();
247
248        for (region, dn) in region_to_dn_map.iter() {
249            let region_server = instance.datanodes().get(dn).unwrap().region_server();
250
251            let region_id = RegionId::new(table_id, *region);
252
253            let stream = region_server
254                .handle_remote_read(
255                    QueryRequest {
256                        region_id: region_id.as_u64(),
257                        plan: plan.to_vec(),
258                        ..Default::default()
259                    },
260                    QueryContext::arc(),
261                )
262                .await
263                .unwrap();
264
265            let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
266            let actual = recordbatches.pretty_print().unwrap();
267
268            let expected = expected_distribution.get(region).unwrap();
269            assert_eq!(&actual, expected);
270        }
271    }
272
273    async fn drop_table(instance: &Instance) {
274        let sql = "DROP TABLE demo";
275        let output = query(instance, sql).await;
276        let OutputData::AffectedRows(x) = output.data else {
277            unreachable!()
278        };
279        assert_eq!(x, 0);
280    }
281
282    async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
283        assert!(
284            instance
285                .frontend()
286                .catalog_manager()
287                .table("greptime", "public", "demo", None)
288                .await
289                .unwrap()
290                .is_none()
291        )
292    }
293
294    #[tokio::test(flavor = "multi_thread")]
295    async fn test_sql_interceptor_plugin() {
296        #[derive(Default)]
297        struct AssertionHook {
298            pub(crate) c: AtomicU32,
299        }
300
301        impl SqlQueryInterceptor for AssertionHook {
302            type Error = Error;
303
304            fn pre_parsing<'a>(
305                &self,
306                query: &'a str,
307                _query_ctx: QueryContextRef,
308            ) -> Result<Cow<'a, str>> {
309                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
310                assert!(query.starts_with("CREATE TABLE demo"));
311                Ok(Cow::Borrowed(query))
312            }
313
314            fn post_parsing(
315                &self,
316                statements: Vec<Statement>,
317                _query_ctx: QueryContextRef,
318            ) -> Result<Vec<Statement>> {
319                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
320                assert!(matches!(statements[0], Statement::CreateTable(_)));
321                Ok(statements)
322            }
323
324            fn pre_execute(
325                &self,
326                _statement: &Statement,
327                _plan: Option<&LogicalPlan>,
328                _query_ctx: QueryContextRef,
329            ) -> Result<()> {
330                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
331                Ok(())
332            }
333
334            fn post_execute(
335                &self,
336                mut output: Output,
337                _query_ctx: QueryContextRef,
338            ) -> Result<Output> {
339                let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
340                match &mut output.data {
341                    OutputData::AffectedRows(rows) => {
342                        assert_eq!(*rows, 0);
343                        // update output result
344                        *rows = 10;
345                    }
346                    _ => unreachable!(),
347                }
348                Ok(output)
349            }
350        }
351
352        let plugins = Plugins::new();
353        let counter_hook = Arc::new(AssertionHook::default());
354        plugins.insert::<SqlQueryInterceptorRef<Error>>(counter_hook.clone());
355
356        let standalone = GreptimeDbStandaloneBuilder::new("test_sql_interceptor_plugin")
357            .with_plugin(plugins)
358            .build()
359            .await;
360        let instance = standalone.fe_instance().clone();
361
362        let sql = r#"CREATE TABLE demo(
363                            host STRING,
364                            ts TIMESTAMP,
365                            cpu DOUBLE NULL,
366                            memory DOUBLE NULL,
367                            disk_util DOUBLE DEFAULT 9.9,
368                            TIME INDEX (ts),
369                            PRIMARY KEY(host)
370                        ) engine=mito;"#;
371        let output = SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc())
372            .await
373            .remove(0)
374            .unwrap();
375
376        // assert that the hook is called 3 times
377        assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed));
378        match output.data {
379            OutputData::AffectedRows(rows) => assert_eq!(rows, 10),
380            _ => unreachable!(),
381        }
382    }
383
384    #[tokio::test(flavor = "multi_thread")]
385    async fn test_disable_db_operation_plugin() {
386        #[derive(Default)]
387        struct DisableDBOpHook;
388
389        impl SqlQueryInterceptor for DisableDBOpHook {
390            type Error = Error;
391
392            fn post_parsing(
393                &self,
394                statements: Vec<Statement>,
395                _query_ctx: QueryContextRef,
396            ) -> Result<Vec<Statement>> {
397                for s in &statements {
398                    match s {
399                        Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => {
400                            return Err(Error::NotSupported {
401                                feat: "Database operations".to_owned(),
402                            });
403                        }
404                        _ => {}
405                    }
406                }
407
408                Ok(statements)
409            }
410        }
411
412        let query_ctx = QueryContext::arc();
413
414        let plugins = Plugins::new();
415        let hook = Arc::new(DisableDBOpHook);
416        plugins.insert::<SqlQueryInterceptorRef<Error>>(hook.clone());
417
418        let standalone = GreptimeDbStandaloneBuilder::new("test_disable_db_operation_plugin")
419            .with_plugin(plugins)
420            .build()
421            .await;
422        let instance = standalone.fe_instance().clone();
423
424        let sql = r#"CREATE TABLE demo(
425                            host STRING,
426                            ts TIMESTAMP,
427                            cpu DOUBLE NULL,
428                            memory DOUBLE NULL,
429                            disk_util DOUBLE DEFAULT 9.9,
430                            TIME INDEX (ts),
431                            PRIMARY KEY(host)
432                        ) engine=mito;"#;
433        let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
434            .await
435            .remove(0)
436            .unwrap();
437
438        match output.data {
439            OutputData::AffectedRows(rows) => assert_eq!(rows, 0),
440            _ => unreachable!(),
441        }
442
443        let sql = r#"CREATE DATABASE tomcat"#;
444        if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
445            .await
446            .remove(0)
447        {
448            assert_eq!(e.status_code(), StatusCode::Unsupported);
449        } else {
450            unreachable!();
451        }
452
453        let sql = r#"SELECT 1; SHOW DATABASES"#;
454        if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
455            .await
456            .remove(0)
457        {
458            assert_eq!(e.status_code(), StatusCode::Unsupported);
459        } else {
460            unreachable!();
461        }
462    }
463}