frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::sync::Arc;
29use std::time::SystemTime;
30
31use async_trait::async_trait;
32use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
33use catalog::CatalogManagerRef;
34use client::OutputData;
35use common_base::Plugins;
36use common_config::KvBackendConfig;
37use common_error::ext::{BoxedError, ErrorExt};
38use common_meta::key::TableMetadataManagerRef;
39use common_meta::kv_backend::KvBackendRef;
40use common_meta::state_store::KvStateStore;
41use common_procedure::local::{LocalManager, ManagerConfig};
42use common_procedure::options::ProcedureConfig;
43use common_procedure::ProcedureManagerRef;
44use common_query::Output;
45use common_telemetry::{debug, error, info, tracing};
46use datafusion_expr::LogicalPlan;
47use log_store::raft_engine::RaftEngineBackend;
48use operator::delete::DeleterRef;
49use operator::insert::InserterRef;
50use operator::statement::{StatementExecutor, StatementExecutorRef};
51use pipeline::pipeline_operator::PipelineOperator;
52use prometheus::HistogramTimer;
53use promql_parser::label::Matcher;
54use query::metrics::OnDone;
55use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
56use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
57use query::query_engine::DescribeResult;
58use query::stats::StatementStatistics;
59use query::QueryEngineRef;
60use servers::error as server_error;
61use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
62use servers::interceptor::{
63    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
64};
65use servers::prometheus_handler::PrometheusHandler;
66use servers::query_handler::sql::SqlQueryHandler;
67use session::context::QueryContextRef;
68use session::table_name::table_idents_to_full_name;
69use snafu::prelude::*;
70use sql::dialect::Dialect;
71use sql::parser::{ParseOptions, ParserContext};
72use sql::statements::copy::{CopyDatabase, CopyTable};
73use sql::statements::statement::Statement;
74use sqlparser::ast::ObjectName;
75pub use standalone::StandaloneDatanodeManager;
76
77use crate::error::{
78    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
79    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
80    TableOperationSnafu,
81};
82use crate::limiter::LimiterRef;
83
84/// The frontend instance contains necessary components, and implements many
85/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
86/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
87#[derive(Clone)]
88pub struct Instance {
89    catalog_manager: CatalogManagerRef,
90    pipeline_operator: Arc<PipelineOperator>,
91    statement_executor: Arc<StatementExecutor>,
92    query_engine: QueryEngineRef,
93    plugins: Plugins,
94    inserter: InserterRef,
95    deleter: DeleterRef,
96    table_metadata_manager: TableMetadataManagerRef,
97    stats: StatementStatistics,
98    limiter: Option<LimiterRef>,
99}
100
101impl Instance {
102    pub async fn try_build_standalone_components(
103        dir: String,
104        kv_backend_config: KvBackendConfig,
105        procedure_config: ProcedureConfig,
106    ) -> Result<(KvBackendRef, ProcedureManagerRef)> {
107        info!(
108            "Creating metadata kvbackend with config: {:?}",
109            kv_backend_config
110        );
111        let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
112            .map_err(BoxedError::new)
113            .context(error::OpenRaftEngineBackendSnafu)?;
114
115        let kv_backend = Arc::new(kv_backend);
116        let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
117
118        let manager_config = ManagerConfig {
119            max_retry_times: procedure_config.max_retry_times,
120            retry_delay: procedure_config.retry_delay,
121            max_running_procedures: procedure_config.max_running_procedures,
122            ..Default::default()
123        };
124        let procedure_manager = Arc::new(LocalManager::new(
125            manager_config,
126            kv_state_store.clone(),
127            kv_state_store,
128        ));
129
130        Ok((kv_backend, procedure_manager))
131    }
132
133    pub fn catalog_manager(&self) -> &CatalogManagerRef {
134        &self.catalog_manager
135    }
136
137    pub fn query_engine(&self) -> &QueryEngineRef {
138        &self.query_engine
139    }
140
141    pub fn plugins(&self) -> &Plugins {
142        &self.plugins
143    }
144
145    pub fn statement_executor(&self) -> &StatementExecutorRef {
146        &self.statement_executor
147    }
148
149    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
150        &self.table_metadata_manager
151    }
152
153    pub fn inserter(&self) -> &InserterRef {
154        &self.inserter
155    }
156}
157
158fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
159    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
160}
161
162impl Instance {
163    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
164        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
165
166        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
167        let query_interceptor = query_interceptor.as_ref();
168
169        let _slow_query_timer = self
170            .stats
171            .start_slow_query_timer(QueryStatement::Sql(stmt.clone()));
172
173        let output = match stmt {
174            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
175                // TODO: remove this when format is supported in datafusion
176                if let Statement::Explain(explain) = &stmt {
177                    if let Some(format) = explain.format() {
178                        query_ctx.set_explain_format(format.to_string());
179                    }
180                }
181
182                let stmt = QueryStatement::Sql(stmt);
183                let plan = self
184                    .statement_executor
185                    .plan(&stmt, query_ctx.clone())
186                    .await?;
187
188                let QueryStatement::Sql(stmt) = stmt else {
189                    unreachable!()
190                };
191                query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
192
193                self.statement_executor.exec_plan(plan, query_ctx).await
194            }
195            Statement::Tql(tql) => {
196                let plan = self
197                    .statement_executor
198                    .plan_tql(tql.clone(), &query_ctx)
199                    .await?;
200
201                query_interceptor.pre_execute(
202                    &Statement::Tql(tql),
203                    Some(&plan),
204                    query_ctx.clone(),
205                )?;
206
207                self.statement_executor.exec_plan(plan, query_ctx).await
208            }
209            _ => {
210                query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
211
212                self.statement_executor.execute_sql(stmt, query_ctx).await
213            }
214        };
215        output.context(TableOperationSnafu)
216    }
217}
218
219#[async_trait]
220impl SqlQueryHandler for Instance {
221    type Error = Error;
222
223    #[tracing::instrument(skip_all)]
224    async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
225        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
226        let query_interceptor = query_interceptor_opt.as_ref();
227        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
228            Ok(q) => q,
229            Err(e) => return vec![Err(e)],
230        };
231
232        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
233        let checker = checker_ref.as_ref();
234
235        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
236            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
237        {
238            Ok(stmts) => {
239                let mut results = Vec::with_capacity(stmts.len());
240                for stmt in stmts {
241                    if let Err(e) = checker
242                        .check_permission(
243                            query_ctx.current_user(),
244                            PermissionReq::SqlStatement(&stmt),
245                        )
246                        .context(PermissionSnafu)
247                    {
248                        results.push(Err(e));
249                        break;
250                    }
251
252                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
253                        Ok(output) => {
254                            let output_result =
255                                query_interceptor.post_execute(output, query_ctx.clone());
256                            results.push(output_result);
257                        }
258                        Err(e) => {
259                            if e.status_code().should_log_error() {
260                                error!(e; "Failed to execute query: {stmt}");
261                            } else {
262                                debug!("Failed to execute query: {stmt}, {e}");
263                            }
264                            results.push(Err(e));
265                            break;
266                        }
267                    }
268                }
269                results
270            }
271            Err(e) => {
272                vec![Err(e)]
273            }
274        }
275    }
276
277    async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
278        // plan should be prepared before exec
279        // we'll do check there
280        self.query_engine
281            .execute(plan.clone(), query_ctx)
282            .await
283            .context(ExecLogicalPlanSnafu)
284    }
285
286    #[tracing::instrument(skip_all)]
287    async fn do_promql_query(
288        &self,
289        query: &PromQuery,
290        query_ctx: QueryContextRef,
291    ) -> Vec<Result<Output>> {
292        // check will be done in prometheus handler's do_query
293        let result = PrometheusHandler::do_query(self, query, query_ctx)
294            .await
295            .with_context(|_| ExecutePromqlSnafu {
296                query: format!("{query:?}"),
297            });
298        vec![result]
299    }
300
301    async fn do_describe(
302        &self,
303        stmt: Statement,
304        query_ctx: QueryContextRef,
305    ) -> Result<Option<DescribeResult>> {
306        if matches!(
307            stmt,
308            Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
309        ) {
310            self.plugins
311                .get::<PermissionCheckerRef>()
312                .as_ref()
313                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
314                .context(PermissionSnafu)?;
315
316            let plan = self
317                .query_engine
318                .planner()
319                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
320                .await
321                .context(PlanStatementSnafu)?;
322            self.query_engine
323                .describe(plan, query_ctx)
324                .await
325                .map(Some)
326                .context(error::DescribeStatementSnafu)
327        } else {
328            Ok(None)
329        }
330    }
331
332    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
333        self.catalog_manager
334            .schema_exists(catalog, schema, None)
335            .await
336            .context(error::CatalogSnafu)
337    }
338}
339
340/// Attaches a timer to the output and observes it once the output is exhausted.
341pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
342    match output.data {
343        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
344        OutputData::Stream(stream) => {
345            let stream = OnDone::new(stream, move || {
346                timer.observe_duration();
347            });
348            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
349        }
350    }
351}
352
353#[async_trait]
354impl PrometheusHandler for Instance {
355    #[tracing::instrument(skip_all)]
356    async fn do_query(
357        &self,
358        query: &PromQuery,
359        query_ctx: QueryContextRef,
360    ) -> server_error::Result<Output> {
361        let interceptor = self
362            .plugins
363            .get::<PromQueryInterceptorRef<server_error::Error>>();
364
365        self.plugins
366            .get::<PermissionCheckerRef>()
367            .as_ref()
368            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
369            .context(AuthSnafu)?;
370
371        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
372            ParsePromQLSnafu {
373                query: query.clone(),
374            }
375        })?;
376
377        let _slow_query_timer = self.stats.start_slow_query_timer(stmt.clone());
378
379        let plan = self
380            .statement_executor
381            .plan(&stmt, query_ctx.clone())
382            .await
383            .map_err(BoxedError::new)
384            .context(ExecuteQuerySnafu)?;
385
386        interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
387
388        let output = self
389            .statement_executor
390            .exec_plan(plan, query_ctx.clone())
391            .await
392            .map_err(BoxedError::new)
393            .context(ExecuteQuerySnafu)?;
394
395        Ok(interceptor.post_execute(output, query_ctx)?)
396    }
397
398    async fn query_metric_names(
399        &self,
400        matchers: Vec<Matcher>,
401        ctx: &QueryContextRef,
402    ) -> server_error::Result<Vec<String>> {
403        self.handle_query_metric_names(matchers, ctx)
404            .await
405            .map_err(BoxedError::new)
406            .context(ExecuteQuerySnafu)
407    }
408
409    async fn query_label_values(
410        &self,
411        metric: String,
412        label_name: String,
413        matchers: Vec<Matcher>,
414        start: SystemTime,
415        end: SystemTime,
416        ctx: &QueryContextRef,
417    ) -> server_error::Result<Vec<String>> {
418        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
419            .await
420            .map_err(BoxedError::new)
421            .context(ExecuteQuerySnafu)
422    }
423
424    fn catalog_manager(&self) -> CatalogManagerRef {
425        self.catalog_manager.clone()
426    }
427}
428
429/// Validate `stmt.database` permission if it's presented.
430macro_rules! validate_db_permission {
431    ($stmt: expr, $query_ctx: expr) => {
432        if let Some(database) = &$stmt.database {
433            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
434                .map_err(BoxedError::new)
435                .context(SqlExecInterceptedSnafu)?;
436        }
437    };
438}
439
440pub fn check_permission(
441    plugins: Plugins,
442    stmt: &Statement,
443    query_ctx: &QueryContextRef,
444) -> Result<()> {
445    let need_validate = plugins
446        .get::<QueryOptions>()
447        .map(|opts| opts.disallow_cross_catalog_query)
448        .unwrap_or_default();
449
450    if !need_validate {
451        return Ok(());
452    }
453
454    match stmt {
455        // Will be checked in execution.
456        // TODO(dennis): add a hook for admin commands.
457        Statement::Admin(_) => {}
458        // These are executed by query engine, and will be checked there.
459        Statement::Query(_)
460        | Statement::Explain(_)
461        | Statement::Tql(_)
462        | Statement::Delete(_)
463        | Statement::DeclareCursor(_)
464        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
465        // database ops won't be checked
466        Statement::CreateDatabase(_)
467        | Statement::ShowDatabases(_)
468        | Statement::DropDatabase(_)
469        | Statement::AlterDatabase(_)
470        | Statement::DropFlow(_)
471        | Statement::Use(_) => {}
472        Statement::ShowCreateDatabase(stmt) => {
473            validate_database(&stmt.database_name, query_ctx)?;
474        }
475        Statement::ShowCreateTable(stmt) => {
476            validate_param(&stmt.table_name, query_ctx)?;
477        }
478        Statement::ShowCreateFlow(stmt) => {
479            validate_param(&stmt.flow_name, query_ctx)?;
480        }
481        Statement::ShowCreateView(stmt) => {
482            validate_param(&stmt.view_name, query_ctx)?;
483        }
484        Statement::CreateExternalTable(stmt) => {
485            validate_param(&stmt.name, query_ctx)?;
486        }
487        Statement::CreateFlow(stmt) => {
488            // TODO: should also validate source table name here?
489            validate_param(&stmt.sink_table_name, query_ctx)?;
490        }
491        Statement::CreateView(stmt) => {
492            validate_param(&stmt.name, query_ctx)?;
493        }
494        Statement::AlterTable(stmt) => {
495            validate_param(stmt.table_name(), query_ctx)?;
496        }
497        // set/show variable now only alter/show variable in session
498        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
499        // show charset and show collation won't be checked
500        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
501
502        Statement::Insert(insert) => {
503            let name = insert.table_name().context(ParseSqlSnafu)?;
504            validate_param(name, query_ctx)?;
505        }
506        Statement::CreateTable(stmt) => {
507            validate_param(&stmt.name, query_ctx)?;
508        }
509        Statement::CreateTableLike(stmt) => {
510            validate_param(&stmt.table_name, query_ctx)?;
511            validate_param(&stmt.source_name, query_ctx)?;
512        }
513        Statement::DropTable(drop_stmt) => {
514            for table_name in drop_stmt.table_names() {
515                validate_param(table_name, query_ctx)?;
516            }
517        }
518        Statement::DropView(stmt) => {
519            validate_param(&stmt.view_name, query_ctx)?;
520        }
521        Statement::ShowTables(stmt) => {
522            validate_db_permission!(stmt, query_ctx);
523        }
524        Statement::ShowTableStatus(stmt) => {
525            validate_db_permission!(stmt, query_ctx);
526        }
527        Statement::ShowColumns(stmt) => {
528            validate_db_permission!(stmt, query_ctx);
529        }
530        Statement::ShowIndex(stmt) => {
531            validate_db_permission!(stmt, query_ctx);
532        }
533        Statement::ShowRegion(stmt) => {
534            validate_db_permission!(stmt, query_ctx);
535        }
536        Statement::ShowViews(stmt) => {
537            validate_db_permission!(stmt, query_ctx);
538        }
539        Statement::ShowFlows(stmt) => {
540            validate_db_permission!(stmt, query_ctx);
541        }
542        Statement::ShowStatus(_stmt) => {}
543        Statement::ShowSearchPath(_stmt) => {}
544        Statement::DescribeTable(stmt) => {
545            validate_param(stmt.name(), query_ctx)?;
546        }
547        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
548            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
549            CopyTable::From(copy_table_from) => {
550                validate_param(&copy_table_from.table_name, query_ctx)?
551            }
552        },
553        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
554            match copy_database {
555                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
556                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
557            }
558        }
559        Statement::TruncateTable(stmt) => {
560            validate_param(stmt.table_name(), query_ctx)?;
561        }
562        // cursor operations are always allowed once it's created
563        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
564    }
565    Ok(())
566}
567
568fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
569    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
570        .map_err(BoxedError::new)
571        .context(ExternalSnafu)?;
572
573    validate_catalog_and_schema(&catalog, &schema, query_ctx)
574        .map_err(BoxedError::new)
575        .context(SqlExecInterceptedSnafu)
576}
577
578fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
579    let (catalog, schema) = match &name.0[..] {
580        [schema] => (
581            query_ctx.current_catalog().to_string(),
582            schema.value.clone(),
583        ),
584        [catalog, schema] => (catalog.value.clone(), schema.value.clone()),
585        _ => InvalidSqlSnafu {
586            err_msg: format!(
587                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
588            ),
589        }
590        .fail()?,
591    };
592
593    validate_catalog_and_schema(&catalog, &schema, query_ctx)
594        .map_err(BoxedError::new)
595        .context(SqlExecInterceptedSnafu)
596}
597
598#[cfg(test)]
599mod tests {
600    use std::collections::HashMap;
601
602    use common_base::Plugins;
603    use query::query_engine::options::QueryOptions;
604    use session::context::QueryContext;
605    use sql::dialect::GreptimeDbDialect;
606    use strfmt::Format;
607
608    use super::*;
609
610    #[test]
611    fn test_exec_validation() {
612        let query_ctx = QueryContext::arc();
613        let plugins: Plugins = Plugins::new();
614        plugins.insert(QueryOptions {
615            disallow_cross_catalog_query: true,
616        });
617
618        let sql = r#"
619        SELECT * FROM demo;
620        EXPLAIN SELECT * FROM demo;
621        CREATE DATABASE test_database;
622        SHOW DATABASES;
623        "#;
624        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
625        assert_eq!(stmts.len(), 4);
626        for stmt in stmts {
627            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
628            re.unwrap();
629        }
630
631        let sql = r#"
632        SHOW CREATE TABLE demo;
633        ALTER TABLE demo ADD COLUMN new_col INT;
634        "#;
635        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
636        assert_eq!(stmts.len(), 2);
637        for stmt in stmts {
638            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
639            re.unwrap();
640        }
641
642        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
643            // test right
644            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
645            for (catalog, schema) in right {
646                let sql = do_fmt(template_sql, catalog, schema);
647                do_test(&sql, plugins.clone(), query_ctx, true);
648            }
649
650            let wrong = vec![
651                ("wrongcatalog.", "public."),
652                ("wrongcatalog.", "wrongschema."),
653            ];
654            for (catalog, schema) in wrong {
655                let sql = do_fmt(template_sql, catalog, schema);
656                do_test(&sql, plugins.clone(), query_ctx, false);
657            }
658        }
659
660        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
661            let vars = HashMap::from([
662                ("catalog".to_string(), catalog),
663                ("schema".to_string(), schema),
664            ]);
665            template.format(&vars).unwrap()
666        }
667
668        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
669            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
670            let re = check_permission(plugins, stmt, query_ctx);
671            if is_ok {
672                re.unwrap();
673            } else {
674                assert!(re.is_err());
675            }
676        }
677
678        // test insert
679        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
680        replace_test(sql, plugins.clone(), &query_ctx);
681
682        // test create table
683        let sql = r#"CREATE TABLE {catalog}{schema}demo(
684                            host STRING,
685                            ts TIMESTAMP,
686                            TIME INDEX (ts),
687                            PRIMARY KEY(host)
688                        ) engine=mito;"#;
689        replace_test(sql, plugins.clone(), &query_ctx);
690
691        // test drop table
692        let sql = "DROP TABLE {catalog}{schema}demo;";
693        replace_test(sql, plugins.clone(), &query_ctx);
694
695        // test show tables
696        let sql = "SHOW TABLES FROM public";
697        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
698        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
699
700        let sql = "SHOW TABLES FROM private";
701        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
702        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
703        assert!(re.is_ok());
704
705        // test describe table
706        let sql = "DESC TABLE {catalog}{schema}demo;";
707        replace_test(sql, plugins, &query_ctx);
708    }
709}