frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::sync::Arc;
29use std::time::SystemTime;
30
31use async_trait::async_trait;
32use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
33use catalog::CatalogManagerRef;
34use client::OutputData;
35use common_base::Plugins;
36use common_config::KvBackendConfig;
37use common_error::ext::{BoxedError, ErrorExt};
38use common_meta::key::TableMetadataManagerRef;
39use common_meta::kv_backend::KvBackendRef;
40use common_meta::state_store::KvStateStore;
41use common_procedure::local::{LocalManager, ManagerConfig};
42use common_procedure::options::ProcedureConfig;
43use common_procedure::ProcedureManagerRef;
44use common_query::Output;
45use common_telemetry::{debug, error, info, tracing};
46use datafusion_expr::LogicalPlan;
47use log_store::raft_engine::RaftEngineBackend;
48use operator::delete::DeleterRef;
49use operator::insert::InserterRef;
50use operator::statement::{StatementExecutor, StatementExecutorRef};
51use pipeline::pipeline_operator::PipelineOperator;
52use prometheus::HistogramTimer;
53use promql_parser::label::Matcher;
54use query::metrics::OnDone;
55use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
56use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
57use query::query_engine::DescribeResult;
58use query::QueryEngineRef;
59use servers::error as server_error;
60use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
61use servers::interceptor::{
62    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
63};
64use servers::prometheus_handler::PrometheusHandler;
65use servers::query_handler::sql::SqlQueryHandler;
66use session::context::QueryContextRef;
67use session::table_name::table_idents_to_full_name;
68use snafu::prelude::*;
69use sql::dialect::Dialect;
70use sql::parser::{ParseOptions, ParserContext};
71use sql::statements::copy::{CopyDatabase, CopyTable};
72use sql::statements::statement::Statement;
73use sqlparser::ast::ObjectName;
74pub use standalone::StandaloneDatanodeManager;
75
76use crate::error::{
77    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
78    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
79    TableOperationSnafu,
80};
81use crate::limiter::LimiterRef;
82use crate::slow_query_recorder::SlowQueryRecorder;
83
84/// The frontend instance contains necessary components, and implements many
85/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
86/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
87#[derive(Clone)]
88pub struct Instance {
89    catalog_manager: CatalogManagerRef,
90    pipeline_operator: Arc<PipelineOperator>,
91    statement_executor: Arc<StatementExecutor>,
92    query_engine: QueryEngineRef,
93    plugins: Plugins,
94    inserter: InserterRef,
95    deleter: DeleterRef,
96    table_metadata_manager: TableMetadataManagerRef,
97    slow_query_recorder: Option<SlowQueryRecorder>,
98    limiter: Option<LimiterRef>,
99}
100
101impl Instance {
102    pub async fn try_build_standalone_components(
103        dir: String,
104        kv_backend_config: KvBackendConfig,
105        procedure_config: ProcedureConfig,
106    ) -> Result<(KvBackendRef, ProcedureManagerRef)> {
107        info!(
108            "Creating metadata kvbackend with config: {:?}",
109            kv_backend_config
110        );
111        let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
112            .map_err(BoxedError::new)
113            .context(error::OpenRaftEngineBackendSnafu)?;
114
115        let kv_backend = Arc::new(kv_backend);
116        let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
117
118        let manager_config = ManagerConfig {
119            max_retry_times: procedure_config.max_retry_times,
120            retry_delay: procedure_config.retry_delay,
121            max_running_procedures: procedure_config.max_running_procedures,
122            ..Default::default()
123        };
124        let procedure_manager = Arc::new(LocalManager::new(
125            manager_config,
126            kv_state_store.clone(),
127            kv_state_store,
128        ));
129
130        Ok((kv_backend, procedure_manager))
131    }
132
133    pub fn catalog_manager(&self) -> &CatalogManagerRef {
134        &self.catalog_manager
135    }
136
137    pub fn query_engine(&self) -> &QueryEngineRef {
138        &self.query_engine
139    }
140
141    pub fn plugins(&self) -> &Plugins {
142        &self.plugins
143    }
144
145    pub fn statement_executor(&self) -> &StatementExecutorRef {
146        &self.statement_executor
147    }
148
149    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
150        &self.table_metadata_manager
151    }
152
153    pub fn inserter(&self) -> &InserterRef {
154        &self.inserter
155    }
156}
157
158fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
159    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
160}
161
162impl Instance {
163    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
164        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
165
166        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
167        let query_interceptor = query_interceptor.as_ref();
168
169        let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
170            recorder.start(QueryStatement::Sql(stmt.clone()), query_ctx.clone())
171        } else {
172            None
173        };
174
175        let output = match stmt {
176            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
177                // TODO: remove this when format is supported in datafusion
178                if let Statement::Explain(explain) = &stmt {
179                    if let Some(format) = explain.format() {
180                        query_ctx.set_explain_format(format.to_string());
181                    }
182                }
183
184                let stmt = QueryStatement::Sql(stmt);
185                let plan = self
186                    .statement_executor
187                    .plan(&stmt, query_ctx.clone())
188                    .await?;
189
190                let QueryStatement::Sql(stmt) = stmt else {
191                    unreachable!()
192                };
193                query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
194
195                self.statement_executor.exec_plan(plan, query_ctx).await
196            }
197            Statement::Tql(tql) => {
198                let plan = self
199                    .statement_executor
200                    .plan_tql(tql.clone(), &query_ctx)
201                    .await?;
202
203                query_interceptor.pre_execute(
204                    &Statement::Tql(tql),
205                    Some(&plan),
206                    query_ctx.clone(),
207                )?;
208
209                self.statement_executor.exec_plan(plan, query_ctx).await
210            }
211            _ => {
212                query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
213
214                self.statement_executor.execute_sql(stmt, query_ctx).await
215            }
216        };
217
218        output.context(TableOperationSnafu)
219    }
220}
221
222#[async_trait]
223impl SqlQueryHandler for Instance {
224    type Error = Error;
225
226    #[tracing::instrument(skip_all)]
227    async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
228        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
229        let query_interceptor = query_interceptor_opt.as_ref();
230        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
231            Ok(q) => q,
232            Err(e) => return vec![Err(e)],
233        };
234
235        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
236        let checker = checker_ref.as_ref();
237
238        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
239            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
240        {
241            Ok(stmts) => {
242                let mut results = Vec::with_capacity(stmts.len());
243                for stmt in stmts {
244                    if let Err(e) = checker
245                        .check_permission(
246                            query_ctx.current_user(),
247                            PermissionReq::SqlStatement(&stmt),
248                        )
249                        .context(PermissionSnafu)
250                    {
251                        results.push(Err(e));
252                        break;
253                    }
254
255                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
256                        Ok(output) => {
257                            let output_result =
258                                query_interceptor.post_execute(output, query_ctx.clone());
259                            results.push(output_result);
260                        }
261                        Err(e) => {
262                            if e.status_code().should_log_error() {
263                                error!(e; "Failed to execute query: {stmt}");
264                            } else {
265                                debug!("Failed to execute query: {stmt}, {e}");
266                            }
267                            results.push(Err(e));
268                            break;
269                        }
270                    }
271                }
272                results
273            }
274            Err(e) => {
275                vec![Err(e)]
276            }
277        }
278    }
279
280    async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
281        // plan should be prepared before exec
282        // we'll do check there
283        self.query_engine
284            .execute(plan.clone(), query_ctx)
285            .await
286            .context(ExecLogicalPlanSnafu)
287    }
288
289    #[tracing::instrument(skip_all)]
290    async fn do_promql_query(
291        &self,
292        query: &PromQuery,
293        query_ctx: QueryContextRef,
294    ) -> Vec<Result<Output>> {
295        // check will be done in prometheus handler's do_query
296        let result = PrometheusHandler::do_query(self, query, query_ctx)
297            .await
298            .with_context(|_| ExecutePromqlSnafu {
299                query: format!("{query:?}"),
300            });
301        vec![result]
302    }
303
304    async fn do_describe(
305        &self,
306        stmt: Statement,
307        query_ctx: QueryContextRef,
308    ) -> Result<Option<DescribeResult>> {
309        if matches!(
310            stmt,
311            Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
312        ) {
313            self.plugins
314                .get::<PermissionCheckerRef>()
315                .as_ref()
316                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
317                .context(PermissionSnafu)?;
318
319            let plan = self
320                .query_engine
321                .planner()
322                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
323                .await
324                .context(PlanStatementSnafu)?;
325            self.query_engine
326                .describe(plan, query_ctx)
327                .await
328                .map(Some)
329                .context(error::DescribeStatementSnafu)
330        } else {
331            Ok(None)
332        }
333    }
334
335    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
336        self.catalog_manager
337            .schema_exists(catalog, schema, None)
338            .await
339            .context(error::CatalogSnafu)
340    }
341}
342
343/// Attaches a timer to the output and observes it once the output is exhausted.
344pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
345    match output.data {
346        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
347        OutputData::Stream(stream) => {
348            let stream = OnDone::new(stream, move || {
349                timer.observe_duration();
350            });
351            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
352        }
353    }
354}
355
356#[async_trait]
357impl PrometheusHandler for Instance {
358    #[tracing::instrument(skip_all)]
359    async fn do_query(
360        &self,
361        query: &PromQuery,
362        query_ctx: QueryContextRef,
363    ) -> server_error::Result<Output> {
364        let interceptor = self
365            .plugins
366            .get::<PromQueryInterceptorRef<server_error::Error>>();
367
368        self.plugins
369            .get::<PermissionCheckerRef>()
370            .as_ref()
371            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
372            .context(AuthSnafu)?;
373
374        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
375            ParsePromQLSnafu {
376                query: query.clone(),
377            }
378        })?;
379
380        let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
381            recorder.start(stmt.clone(), query_ctx.clone())
382        } else {
383            None
384        };
385
386        let plan = self
387            .statement_executor
388            .plan(&stmt, query_ctx.clone())
389            .await
390            .map_err(BoxedError::new)
391            .context(ExecuteQuerySnafu)?;
392
393        interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
394
395        let output = self
396            .statement_executor
397            .exec_plan(plan, query_ctx.clone())
398            .await
399            .map_err(BoxedError::new)
400            .context(ExecuteQuerySnafu)?;
401
402        Ok(interceptor.post_execute(output, query_ctx)?)
403    }
404
405    async fn query_metric_names(
406        &self,
407        matchers: Vec<Matcher>,
408        ctx: &QueryContextRef,
409    ) -> server_error::Result<Vec<String>> {
410        self.handle_query_metric_names(matchers, ctx)
411            .await
412            .map_err(BoxedError::new)
413            .context(ExecuteQuerySnafu)
414    }
415
416    async fn query_label_values(
417        &self,
418        metric: String,
419        label_name: String,
420        matchers: Vec<Matcher>,
421        start: SystemTime,
422        end: SystemTime,
423        ctx: &QueryContextRef,
424    ) -> server_error::Result<Vec<String>> {
425        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
426            .await
427            .map_err(BoxedError::new)
428            .context(ExecuteQuerySnafu)
429    }
430
431    fn catalog_manager(&self) -> CatalogManagerRef {
432        self.catalog_manager.clone()
433    }
434}
435
436/// Validate `stmt.database` permission if it's presented.
437macro_rules! validate_db_permission {
438    ($stmt: expr, $query_ctx: expr) => {
439        if let Some(database) = &$stmt.database {
440            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
441                .map_err(BoxedError::new)
442                .context(SqlExecInterceptedSnafu)?;
443        }
444    };
445}
446
447pub fn check_permission(
448    plugins: Plugins,
449    stmt: &Statement,
450    query_ctx: &QueryContextRef,
451) -> Result<()> {
452    let need_validate = plugins
453        .get::<QueryOptions>()
454        .map(|opts| opts.disallow_cross_catalog_query)
455        .unwrap_or_default();
456
457    if !need_validate {
458        return Ok(());
459    }
460
461    match stmt {
462        // Will be checked in execution.
463        // TODO(dennis): add a hook for admin commands.
464        Statement::Admin(_) => {}
465        // These are executed by query engine, and will be checked there.
466        Statement::Query(_)
467        | Statement::Explain(_)
468        | Statement::Tql(_)
469        | Statement::Delete(_)
470        | Statement::DeclareCursor(_)
471        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
472        // database ops won't be checked
473        Statement::CreateDatabase(_)
474        | Statement::ShowDatabases(_)
475        | Statement::DropDatabase(_)
476        | Statement::AlterDatabase(_)
477        | Statement::DropFlow(_)
478        | Statement::Use(_) => {}
479        Statement::ShowCreateDatabase(stmt) => {
480            validate_database(&stmt.database_name, query_ctx)?;
481        }
482        Statement::ShowCreateTable(stmt) => {
483            validate_param(&stmt.table_name, query_ctx)?;
484        }
485        Statement::ShowCreateFlow(stmt) => {
486            validate_param(&stmt.flow_name, query_ctx)?;
487        }
488        Statement::ShowCreateView(stmt) => {
489            validate_param(&stmt.view_name, query_ctx)?;
490        }
491        Statement::CreateExternalTable(stmt) => {
492            validate_param(&stmt.name, query_ctx)?;
493        }
494        Statement::CreateFlow(stmt) => {
495            // TODO: should also validate source table name here?
496            validate_param(&stmt.sink_table_name, query_ctx)?;
497        }
498        Statement::CreateView(stmt) => {
499            validate_param(&stmt.name, query_ctx)?;
500        }
501        Statement::AlterTable(stmt) => {
502            validate_param(stmt.table_name(), query_ctx)?;
503        }
504        // set/show variable now only alter/show variable in session
505        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
506        // show charset and show collation won't be checked
507        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
508
509        Statement::Insert(insert) => {
510            let name = insert.table_name().context(ParseSqlSnafu)?;
511            validate_param(name, query_ctx)?;
512        }
513        Statement::CreateTable(stmt) => {
514            validate_param(&stmt.name, query_ctx)?;
515        }
516        Statement::CreateTableLike(stmt) => {
517            validate_param(&stmt.table_name, query_ctx)?;
518            validate_param(&stmt.source_name, query_ctx)?;
519        }
520        Statement::DropTable(drop_stmt) => {
521            for table_name in drop_stmt.table_names() {
522                validate_param(table_name, query_ctx)?;
523            }
524        }
525        Statement::DropView(stmt) => {
526            validate_param(&stmt.view_name, query_ctx)?;
527        }
528        Statement::ShowTables(stmt) => {
529            validate_db_permission!(stmt, query_ctx);
530        }
531        Statement::ShowTableStatus(stmt) => {
532            validate_db_permission!(stmt, query_ctx);
533        }
534        Statement::ShowColumns(stmt) => {
535            validate_db_permission!(stmt, query_ctx);
536        }
537        Statement::ShowIndex(stmt) => {
538            validate_db_permission!(stmt, query_ctx);
539        }
540        Statement::ShowRegion(stmt) => {
541            validate_db_permission!(stmt, query_ctx);
542        }
543        Statement::ShowViews(stmt) => {
544            validate_db_permission!(stmt, query_ctx);
545        }
546        Statement::ShowFlows(stmt) => {
547            validate_db_permission!(stmt, query_ctx);
548        }
549        Statement::ShowStatus(_stmt) => {}
550        Statement::ShowSearchPath(_stmt) => {}
551        Statement::DescribeTable(stmt) => {
552            validate_param(stmt.name(), query_ctx)?;
553        }
554        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
555            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
556            CopyTable::From(copy_table_from) => {
557                validate_param(&copy_table_from.table_name, query_ctx)?
558            }
559        },
560        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
561            match copy_database {
562                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
563                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
564            }
565        }
566        Statement::TruncateTable(stmt) => {
567            validate_param(stmt.table_name(), query_ctx)?;
568        }
569        // cursor operations are always allowed once it's created
570        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
571    }
572    Ok(())
573}
574
575fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
576    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
577        .map_err(BoxedError::new)
578        .context(ExternalSnafu)?;
579
580    validate_catalog_and_schema(&catalog, &schema, query_ctx)
581        .map_err(BoxedError::new)
582        .context(SqlExecInterceptedSnafu)
583}
584
585fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
586    let (catalog, schema) = match &name.0[..] {
587        [schema] => (
588            query_ctx.current_catalog().to_string(),
589            schema.value.clone(),
590        ),
591        [catalog, schema] => (catalog.value.clone(), schema.value.clone()),
592        _ => InvalidSqlSnafu {
593            err_msg: format!(
594                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
595            ),
596        }
597        .fail()?,
598    };
599
600    validate_catalog_and_schema(&catalog, &schema, query_ctx)
601        .map_err(BoxedError::new)
602        .context(SqlExecInterceptedSnafu)
603}
604
605#[cfg(test)]
606mod tests {
607    use std::collections::HashMap;
608
609    use common_base::Plugins;
610    use query::query_engine::options::QueryOptions;
611    use session::context::QueryContext;
612    use sql::dialect::GreptimeDbDialect;
613    use strfmt::Format;
614
615    use super::*;
616
617    #[test]
618    fn test_exec_validation() {
619        let query_ctx = QueryContext::arc();
620        let plugins: Plugins = Plugins::new();
621        plugins.insert(QueryOptions {
622            disallow_cross_catalog_query: true,
623        });
624
625        let sql = r#"
626        SELECT * FROM demo;
627        EXPLAIN SELECT * FROM demo;
628        CREATE DATABASE test_database;
629        SHOW DATABASES;
630        "#;
631        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
632        assert_eq!(stmts.len(), 4);
633        for stmt in stmts {
634            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
635            re.unwrap();
636        }
637
638        let sql = r#"
639        SHOW CREATE TABLE demo;
640        ALTER TABLE demo ADD COLUMN new_col INT;
641        "#;
642        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
643        assert_eq!(stmts.len(), 2);
644        for stmt in stmts {
645            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
646            re.unwrap();
647        }
648
649        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
650            // test right
651            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
652            for (catalog, schema) in right {
653                let sql = do_fmt(template_sql, catalog, schema);
654                do_test(&sql, plugins.clone(), query_ctx, true);
655            }
656
657            let wrong = vec![
658                ("wrongcatalog.", "public."),
659                ("wrongcatalog.", "wrongschema."),
660            ];
661            for (catalog, schema) in wrong {
662                let sql = do_fmt(template_sql, catalog, schema);
663                do_test(&sql, plugins.clone(), query_ctx, false);
664            }
665        }
666
667        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
668            let vars = HashMap::from([
669                ("catalog".to_string(), catalog),
670                ("schema".to_string(), schema),
671            ]);
672            template.format(&vars).unwrap()
673        }
674
675        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
676            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
677            let re = check_permission(plugins, stmt, query_ctx);
678            if is_ok {
679                re.unwrap();
680            } else {
681                assert!(re.is_err());
682            }
683        }
684
685        // test insert
686        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
687        replace_test(sql, plugins.clone(), &query_ctx);
688
689        // test create table
690        let sql = r#"CREATE TABLE {catalog}{schema}demo(
691                            host STRING,
692                            ts TIMESTAMP,
693                            TIME INDEX (ts),
694                            PRIMARY KEY(host)
695                        ) engine=mito;"#;
696        replace_test(sql, plugins.clone(), &query_ctx);
697
698        // test drop table
699        let sql = "DROP TABLE {catalog}{schema}demo;";
700        replace_test(sql, plugins.clone(), &query_ctx);
701
702        // test show tables
703        let sql = "SHOW TABLES FROM public";
704        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
705        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
706
707        let sql = "SHOW TABLES FROM private";
708        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
709        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
710        assert!(re.is_ok());
711
712        // test describe table
713        let sql = "DESC TABLE {catalog}{schema}demo;";
714        replace_test(sql, plugins, &query_ctx);
715    }
716}