frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::sync::Arc;
29use std::time::SystemTime;
30
31use async_trait::async_trait;
32use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
33use catalog::process_manager::ProcessManagerRef;
34use catalog::CatalogManagerRef;
35use client::OutputData;
36use common_base::cancellation::CancellableFuture;
37use common_base::Plugins;
38use common_config::KvBackendConfig;
39use common_error::ext::{BoxedError, ErrorExt};
40use common_meta::key::runtime_switch::RuntimeSwitchManager;
41use common_meta::key::TableMetadataManagerRef;
42use common_meta::kv_backend::KvBackendRef;
43use common_meta::state_store::KvStateStore;
44use common_procedure::local::{LocalManager, ManagerConfig};
45use common_procedure::options::ProcedureConfig;
46use common_procedure::ProcedureManagerRef;
47use common_query::Output;
48use common_telemetry::{debug, error, info, tracing};
49use datafusion_expr::LogicalPlan;
50use log_store::raft_engine::RaftEngineBackend;
51use operator::delete::DeleterRef;
52use operator::insert::InserterRef;
53use operator::statement::{StatementExecutor, StatementExecutorRef};
54use pipeline::pipeline_operator::PipelineOperator;
55use prometheus::HistogramTimer;
56use promql_parser::label::Matcher;
57use query::metrics::OnDone;
58use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
59use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
60use query::query_engine::DescribeResult;
61use query::QueryEngineRef;
62use servers::error as server_error;
63use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
64use servers::interceptor::{
65    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
66};
67use servers::prometheus_handler::PrometheusHandler;
68use servers::query_handler::sql::SqlQueryHandler;
69use session::context::QueryContextRef;
70use session::table_name::table_idents_to_full_name;
71use snafu::prelude::*;
72use sql::dialect::Dialect;
73use sql::parser::{ParseOptions, ParserContext};
74use sql::statements::copy::{CopyDatabase, CopyTable};
75use sql::statements::statement::Statement;
76use sqlparser::ast::ObjectName;
77pub use standalone::StandaloneDatanodeManager;
78
79use crate::error::{
80    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
81    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
82    TableOperationSnafu,
83};
84use crate::limiter::LimiterRef;
85use crate::slow_query_recorder::SlowQueryRecorder;
86use crate::stream_wrapper::CancellableStreamWrapper;
87
88/// The frontend instance contains necessary components, and implements many
89/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
90/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
91#[derive(Clone)]
92pub struct Instance {
93    catalog_manager: CatalogManagerRef,
94    pipeline_operator: Arc<PipelineOperator>,
95    statement_executor: Arc<StatementExecutor>,
96    query_engine: QueryEngineRef,
97    plugins: Plugins,
98    inserter: InserterRef,
99    deleter: DeleterRef,
100    table_metadata_manager: TableMetadataManagerRef,
101    slow_query_recorder: Option<SlowQueryRecorder>,
102    limiter: Option<LimiterRef>,
103    process_manager: ProcessManagerRef,
104}
105
106impl Instance {
107    pub async fn try_build_standalone_components(
108        dir: String,
109        kv_backend_config: KvBackendConfig,
110        procedure_config: ProcedureConfig,
111    ) -> Result<(KvBackendRef, ProcedureManagerRef)> {
112        info!(
113            "Creating metadata kvbackend with config: {:?}",
114            kv_backend_config
115        );
116        let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
117            .map_err(BoxedError::new)
118            .context(error::OpenRaftEngineBackendSnafu)?;
119
120        let kv_backend = Arc::new(kv_backend);
121        let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
122
123        let manager_config = ManagerConfig {
124            max_retry_times: procedure_config.max_retry_times,
125            retry_delay: procedure_config.retry_delay,
126            max_running_procedures: procedure_config.max_running_procedures,
127            ..Default::default()
128        };
129        let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
130        let procedure_manager = Arc::new(LocalManager::new(
131            manager_config,
132            kv_state_store.clone(),
133            kv_state_store,
134            Some(runtime_switch_manager),
135        ));
136
137        Ok((kv_backend, procedure_manager))
138    }
139
140    pub fn catalog_manager(&self) -> &CatalogManagerRef {
141        &self.catalog_manager
142    }
143
144    pub fn query_engine(&self) -> &QueryEngineRef {
145        &self.query_engine
146    }
147
148    pub fn plugins(&self) -> &Plugins {
149        &self.plugins
150    }
151
152    pub fn statement_executor(&self) -> &StatementExecutorRef {
153        &self.statement_executor
154    }
155
156    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
157        &self.table_metadata_manager
158    }
159
160    pub fn inserter(&self) -> &InserterRef {
161        &self.inserter
162    }
163
164    pub fn process_manager(&self) -> &ProcessManagerRef {
165        &self.process_manager
166    }
167}
168
169fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
170    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
171}
172
173impl Instance {
174    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
175        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
176
177        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
178        let query_interceptor = query_interceptor.as_ref();
179
180        let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
181            recorder.start(QueryStatement::Sql(stmt.clone()), query_ctx.clone())
182        } else {
183            None
184        };
185
186        let ticket = self.process_manager.register_query(
187            query_ctx.current_catalog().to_string(),
188            vec![query_ctx.current_schema()],
189            stmt.to_string(),
190            query_ctx.conn_info().to_string(),
191            Some(query_ctx.process_id()),
192        );
193
194        let query_fut = async {
195            match stmt {
196                Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
197                    // TODO: remove this when format is supported in datafusion
198                    if let Statement::Explain(explain) = &stmt {
199                        if let Some(format) = explain.format() {
200                            query_ctx.set_explain_format(format.to_string());
201                        }
202                    }
203
204                    let stmt = QueryStatement::Sql(stmt);
205                    let plan = self
206                        .statement_executor
207                        .plan(&stmt, query_ctx.clone())
208                        .await?;
209
210                    let QueryStatement::Sql(stmt) = stmt else {
211                        unreachable!()
212                    };
213                    query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
214                    self.statement_executor
215                        .exec_plan(plan, query_ctx)
216                        .await
217                        .context(TableOperationSnafu)
218                }
219                Statement::Tql(tql) => {
220                    let plan = self
221                        .statement_executor
222                        .plan_tql(tql.clone(), &query_ctx)
223                        .await?;
224
225                    query_interceptor.pre_execute(
226                        &Statement::Tql(tql),
227                        Some(&plan),
228                        query_ctx.clone(),
229                    )?;
230                    self.statement_executor
231                        .exec_plan(plan, query_ctx)
232                        .await
233                        .context(TableOperationSnafu)
234                }
235                _ => {
236                    query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
237                    self.statement_executor
238                        .execute_sql(stmt, query_ctx)
239                        .await
240                        .context(TableOperationSnafu)
241                }
242            }
243        };
244
245        CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
246            .await
247            .map_err(|_| error::CancelledSnafu.build())?
248            .map(|output| {
249                let Output { meta, data } = output;
250
251                let data = match data {
252                    OutputData::Stream(stream) => {
253                        OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
254                    }
255                    other => other,
256                };
257                Output { data, meta }
258            })
259    }
260}
261
262#[async_trait]
263impl SqlQueryHandler for Instance {
264    type Error = Error;
265
266    #[tracing::instrument(skip_all)]
267    async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
268        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
269        let query_interceptor = query_interceptor_opt.as_ref();
270        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
271            Ok(q) => q,
272            Err(e) => return vec![Err(e)],
273        };
274
275        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
276        let checker = checker_ref.as_ref();
277
278        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
279            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
280        {
281            Ok(stmts) => {
282                let mut results = Vec::with_capacity(stmts.len());
283                for stmt in stmts {
284                    if let Err(e) = checker
285                        .check_permission(
286                            query_ctx.current_user(),
287                            PermissionReq::SqlStatement(&stmt),
288                        )
289                        .context(PermissionSnafu)
290                    {
291                        results.push(Err(e));
292                        break;
293                    }
294
295                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
296                        Ok(output) => {
297                            let output_result =
298                                query_interceptor.post_execute(output, query_ctx.clone());
299                            results.push(output_result);
300                        }
301                        Err(e) => {
302                            if e.status_code().should_log_error() {
303                                error!(e; "Failed to execute query: {stmt}");
304                            } else {
305                                debug!("Failed to execute query: {stmt}, {e}");
306                            }
307                            results.push(Err(e));
308                            break;
309                        }
310                    }
311                }
312                results
313            }
314            Err(e) => {
315                vec![Err(e)]
316            }
317        }
318    }
319
320    async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
321        // plan should be prepared before exec
322        // we'll do check there
323        self.query_engine
324            .execute(plan.clone(), query_ctx)
325            .await
326            .context(ExecLogicalPlanSnafu)
327    }
328
329    #[tracing::instrument(skip_all)]
330    async fn do_promql_query(
331        &self,
332        query: &PromQuery,
333        query_ctx: QueryContextRef,
334    ) -> Vec<Result<Output>> {
335        // check will be done in prometheus handler's do_query
336        let result = PrometheusHandler::do_query(self, query, query_ctx)
337            .await
338            .with_context(|_| ExecutePromqlSnafu {
339                query: format!("{query:?}"),
340            });
341        vec![result]
342    }
343
344    async fn do_describe(
345        &self,
346        stmt: Statement,
347        query_ctx: QueryContextRef,
348    ) -> Result<Option<DescribeResult>> {
349        if matches!(
350            stmt,
351            Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
352        ) {
353            self.plugins
354                .get::<PermissionCheckerRef>()
355                .as_ref()
356                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
357                .context(PermissionSnafu)?;
358
359            let plan = self
360                .query_engine
361                .planner()
362                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
363                .await
364                .context(PlanStatementSnafu)?;
365            self.query_engine
366                .describe(plan, query_ctx)
367                .await
368                .map(Some)
369                .context(error::DescribeStatementSnafu)
370        } else {
371            Ok(None)
372        }
373    }
374
375    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
376        self.catalog_manager
377            .schema_exists(catalog, schema, None)
378            .await
379            .context(error::CatalogSnafu)
380    }
381}
382
383/// Attaches a timer to the output and observes it once the output is exhausted.
384pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
385    match output.data {
386        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
387        OutputData::Stream(stream) => {
388            let stream = OnDone::new(stream, move || {
389                timer.observe_duration();
390            });
391            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
392        }
393    }
394}
395
396#[async_trait]
397impl PrometheusHandler for Instance {
398    #[tracing::instrument(skip_all)]
399    async fn do_query(
400        &self,
401        query: &PromQuery,
402        query_ctx: QueryContextRef,
403    ) -> server_error::Result<Output> {
404        let interceptor = self
405            .plugins
406            .get::<PromQueryInterceptorRef<server_error::Error>>();
407
408        self.plugins
409            .get::<PermissionCheckerRef>()
410            .as_ref()
411            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
412            .context(AuthSnafu)?;
413
414        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
415            ParsePromQLSnafu {
416                query: query.clone(),
417            }
418        })?;
419
420        let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
421            recorder.start(stmt.clone(), query_ctx.clone())
422        } else {
423            None
424        };
425
426        let plan = self
427            .statement_executor
428            .plan(&stmt, query_ctx.clone())
429            .await
430            .map_err(BoxedError::new)
431            .context(ExecuteQuerySnafu)?;
432
433        interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
434
435        let output = self
436            .statement_executor
437            .exec_plan(plan, query_ctx.clone())
438            .await
439            .map_err(BoxedError::new)
440            .context(ExecuteQuerySnafu)?;
441
442        Ok(interceptor.post_execute(output, query_ctx)?)
443    }
444
445    async fn query_metric_names(
446        &self,
447        matchers: Vec<Matcher>,
448        ctx: &QueryContextRef,
449    ) -> server_error::Result<Vec<String>> {
450        self.handle_query_metric_names(matchers, ctx)
451            .await
452            .map_err(BoxedError::new)
453            .context(ExecuteQuerySnafu)
454    }
455
456    async fn query_label_values(
457        &self,
458        metric: String,
459        label_name: String,
460        matchers: Vec<Matcher>,
461        start: SystemTime,
462        end: SystemTime,
463        ctx: &QueryContextRef,
464    ) -> server_error::Result<Vec<String>> {
465        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
466            .await
467            .map_err(BoxedError::new)
468            .context(ExecuteQuerySnafu)
469    }
470
471    fn catalog_manager(&self) -> CatalogManagerRef {
472        self.catalog_manager.clone()
473    }
474}
475
476/// Validate `stmt.database` permission if it's presented.
477macro_rules! validate_db_permission {
478    ($stmt: expr, $query_ctx: expr) => {
479        if let Some(database) = &$stmt.database {
480            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
481                .map_err(BoxedError::new)
482                .context(SqlExecInterceptedSnafu)?;
483        }
484    };
485}
486
487pub fn check_permission(
488    plugins: Plugins,
489    stmt: &Statement,
490    query_ctx: &QueryContextRef,
491) -> Result<()> {
492    let need_validate = plugins
493        .get::<QueryOptions>()
494        .map(|opts| opts.disallow_cross_catalog_query)
495        .unwrap_or_default();
496
497    if !need_validate {
498        return Ok(());
499    }
500
501    match stmt {
502        // Will be checked in execution.
503        // TODO(dennis): add a hook for admin commands.
504        Statement::Admin(_) => {}
505        // These are executed by query engine, and will be checked there.
506        Statement::Query(_)
507        | Statement::Explain(_)
508        | Statement::Tql(_)
509        | Statement::Delete(_)
510        | Statement::DeclareCursor(_)
511        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
512        // database ops won't be checked
513        Statement::CreateDatabase(_)
514        | Statement::ShowDatabases(_)
515        | Statement::DropDatabase(_)
516        | Statement::AlterDatabase(_)
517        | Statement::DropFlow(_)
518        | Statement::Use(_) => {}
519        #[cfg(feature = "enterprise")]
520        Statement::DropTrigger(_) => {}
521        Statement::ShowCreateDatabase(stmt) => {
522            validate_database(&stmt.database_name, query_ctx)?;
523        }
524        Statement::ShowCreateTable(stmt) => {
525            validate_param(&stmt.table_name, query_ctx)?;
526        }
527        Statement::ShowCreateFlow(stmt) => {
528            validate_param(&stmt.flow_name, query_ctx)?;
529        }
530        Statement::ShowCreateView(stmt) => {
531            validate_param(&stmt.view_name, query_ctx)?;
532        }
533        Statement::CreateExternalTable(stmt) => {
534            validate_param(&stmt.name, query_ctx)?;
535        }
536        Statement::CreateFlow(stmt) => {
537            // TODO: should also validate source table name here?
538            validate_param(&stmt.sink_table_name, query_ctx)?;
539        }
540        #[cfg(feature = "enterprise")]
541        Statement::CreateTrigger(stmt) => {
542            validate_param(&stmt.trigger_name, query_ctx)?;
543        }
544        Statement::CreateView(stmt) => {
545            validate_param(&stmt.name, query_ctx)?;
546        }
547        Statement::AlterTable(stmt) => {
548            validate_param(stmt.table_name(), query_ctx)?;
549        }
550        // set/show variable now only alter/show variable in session
551        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
552        // show charset and show collation won't be checked
553        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
554
555        Statement::Insert(insert) => {
556            let name = insert.table_name().context(ParseSqlSnafu)?;
557            validate_param(name, query_ctx)?;
558        }
559        Statement::CreateTable(stmt) => {
560            validate_param(&stmt.name, query_ctx)?;
561        }
562        Statement::CreateTableLike(stmt) => {
563            validate_param(&stmt.table_name, query_ctx)?;
564            validate_param(&stmt.source_name, query_ctx)?;
565        }
566        Statement::DropTable(drop_stmt) => {
567            for table_name in drop_stmt.table_names() {
568                validate_param(table_name, query_ctx)?;
569            }
570        }
571        Statement::DropView(stmt) => {
572            validate_param(&stmt.view_name, query_ctx)?;
573        }
574        Statement::ShowTables(stmt) => {
575            validate_db_permission!(stmt, query_ctx);
576        }
577        Statement::ShowTableStatus(stmt) => {
578            validate_db_permission!(stmt, query_ctx);
579        }
580        Statement::ShowColumns(stmt) => {
581            validate_db_permission!(stmt, query_ctx);
582        }
583        Statement::ShowIndex(stmt) => {
584            validate_db_permission!(stmt, query_ctx);
585        }
586        Statement::ShowRegion(stmt) => {
587            validate_db_permission!(stmt, query_ctx);
588        }
589        Statement::ShowViews(stmt) => {
590            validate_db_permission!(stmt, query_ctx);
591        }
592        Statement::ShowFlows(stmt) => {
593            validate_db_permission!(stmt, query_ctx);
594        }
595        #[cfg(feature = "enterprise")]
596        Statement::ShowTriggers(_stmt) => {
597            // The trigger is organized based on the catalog dimension, so there
598            // is no need to check the permission of the database(schema).
599        }
600        Statement::ShowStatus(_stmt) => {}
601        Statement::ShowSearchPath(_stmt) => {}
602        Statement::DescribeTable(stmt) => {
603            validate_param(stmt.name(), query_ctx)?;
604        }
605        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
606            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
607            CopyTable::From(copy_table_from) => {
608                validate_param(&copy_table_from.table_name, query_ctx)?
609            }
610        },
611        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
612            match copy_database {
613                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
614                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
615            }
616        }
617        Statement::TruncateTable(stmt) => {
618            validate_param(stmt.table_name(), query_ctx)?;
619        }
620        // cursor operations are always allowed once it's created
621        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
622        // User can only kill process in their own catalog.
623        Statement::Kill(_) => {}
624        // SHOW PROCESSLIST
625        Statement::ShowProcesslist(_) => {}
626    }
627    Ok(())
628}
629
630fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
631    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
632        .map_err(BoxedError::new)
633        .context(ExternalSnafu)?;
634
635    validate_catalog_and_schema(&catalog, &schema, query_ctx)
636        .map_err(BoxedError::new)
637        .context(SqlExecInterceptedSnafu)
638}
639
640fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
641    let (catalog, schema) = match &name.0[..] {
642        [schema] => (
643            query_ctx.current_catalog().to_string(),
644            schema.value.clone(),
645        ),
646        [catalog, schema] => (catalog.value.clone(), schema.value.clone()),
647        _ => InvalidSqlSnafu {
648            err_msg: format!(
649                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
650            ),
651        }
652        .fail()?,
653    };
654
655    validate_catalog_and_schema(&catalog, &schema, query_ctx)
656        .map_err(BoxedError::new)
657        .context(SqlExecInterceptedSnafu)
658}
659
660#[cfg(test)]
661mod tests {
662    use std::collections::HashMap;
663
664    use common_base::Plugins;
665    use query::query_engine::options::QueryOptions;
666    use session::context::QueryContext;
667    use sql::dialect::GreptimeDbDialect;
668    use strfmt::Format;
669
670    use super::*;
671
672    #[test]
673    fn test_exec_validation() {
674        let query_ctx = QueryContext::arc();
675        let plugins: Plugins = Plugins::new();
676        plugins.insert(QueryOptions {
677            disallow_cross_catalog_query: true,
678        });
679
680        let sql = r#"
681        SELECT * FROM demo;
682        EXPLAIN SELECT * FROM demo;
683        CREATE DATABASE test_database;
684        SHOW DATABASES;
685        "#;
686        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
687        assert_eq!(stmts.len(), 4);
688        for stmt in stmts {
689            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
690            re.unwrap();
691        }
692
693        let sql = r#"
694        SHOW CREATE TABLE demo;
695        ALTER TABLE demo ADD COLUMN new_col INT;
696        "#;
697        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
698        assert_eq!(stmts.len(), 2);
699        for stmt in stmts {
700            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
701            re.unwrap();
702        }
703
704        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
705            // test right
706            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
707            for (catalog, schema) in right {
708                let sql = do_fmt(template_sql, catalog, schema);
709                do_test(&sql, plugins.clone(), query_ctx, true);
710            }
711
712            let wrong = vec![
713                ("wrongcatalog.", "public."),
714                ("wrongcatalog.", "wrongschema."),
715            ];
716            for (catalog, schema) in wrong {
717                let sql = do_fmt(template_sql, catalog, schema);
718                do_test(&sql, plugins.clone(), query_ctx, false);
719            }
720        }
721
722        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
723            let vars = HashMap::from([
724                ("catalog".to_string(), catalog),
725                ("schema".to_string(), schema),
726            ]);
727            template.format(&vars).unwrap()
728        }
729
730        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
731            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
732            let re = check_permission(plugins, stmt, query_ctx);
733            if is_ok {
734                re.unwrap();
735            } else {
736                assert!(re.is_err());
737            }
738        }
739
740        // test insert
741        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
742        replace_test(sql, plugins.clone(), &query_ctx);
743
744        // test create table
745        let sql = r#"CREATE TABLE {catalog}{schema}demo(
746                            host STRING,
747                            ts TIMESTAMP,
748                            TIME INDEX (ts),
749                            PRIMARY KEY(host)
750                        ) engine=mito;"#;
751        replace_test(sql, plugins.clone(), &query_ctx);
752
753        // test drop table
754        let sql = "DROP TABLE {catalog}{schema}demo;";
755        replace_test(sql, plugins.clone(), &query_ctx);
756
757        // test show tables
758        let sql = "SHOW TABLES FROM public";
759        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
760        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
761
762        let sql = "SHOW TABLES FROM private";
763        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
764        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
765        assert!(re.is_ok());
766
767        // test describe table
768        let sql = "DESC TABLE {catalog}{schema}demo;";
769        replace_test(sql, plugins, &query_ctx);
770    }
771}