Skip to main content

frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod dashboard;
17mod grpc;
18mod influxdb;
19mod jaeger;
20mod log_handler;
21mod logs;
22mod opentsdb;
23mod otlp;
24pub mod prom_store;
25mod promql;
26mod region_query;
27pub mod standalone;
28
29use std::pin::Pin;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, atomic};
32use std::time::{Duration, SystemTime};
33
34use async_stream::stream;
35use async_trait::async_trait;
36use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
37use catalog::CatalogManagerRef;
38use catalog::process_manager::{
39    ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
40};
41use client::OutputData;
42use common_base::Plugins;
43use common_base::cancellation::CancellableFuture;
44use common_error::ext::{BoxedError, ErrorExt};
45use common_event_recorder::EventRecorderRef;
46use common_meta::cache_invalidator::CacheInvalidatorRef;
47use common_meta::key::TableMetadataManagerRef;
48use common_meta::key::table_name::TableNameKey;
49use common_meta::node_manager::NodeManagerRef;
50use common_meta::procedure_executor::ProcedureExecutorRef;
51use common_query::Output;
52use common_recordbatch::RecordBatchStreamWrapper;
53use common_recordbatch::error::StreamTimeoutSnafu;
54use common_telemetry::logging::SlowQueryOptions;
55use common_telemetry::{debug, error, tracing};
56use dashmap::DashMap;
57use datafusion_expr::LogicalPlan;
58use futures::{Stream, StreamExt};
59use lazy_static::lazy_static;
60use operator::delete::DeleterRef;
61use operator::insert::InserterRef;
62use operator::statement::{StatementExecutor, StatementExecutorRef};
63use partition::manager::PartitionRuleManagerRef;
64use pipeline::pipeline_operator::PipelineOperator;
65use prometheus::HistogramTimer;
66use promql_parser::label::Matcher;
67use query::QueryEngineRef;
68use query::metrics::OnDone;
69use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
70use query::query_engine::DescribeResult;
71use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
72use servers::error::{
73    self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
74    OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
75};
76use servers::interceptor::{
77    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
78};
79use servers::otlp::metrics::legacy_normalize_otlp_name;
80use servers::prometheus_handler::PrometheusHandler;
81use servers::query_handler::sql::SqlQueryHandler;
82use session::context::{Channel, QueryContextRef};
83use session::table_name::table_idents_to_full_name;
84use snafu::prelude::*;
85use sql::ast::ObjectNamePartExt;
86use sql::dialect::Dialect;
87use sql::parser::{ParseOptions, ParserContext};
88use sql::statements::comment::CommentObject;
89use sql::statements::copy::{CopyDatabase, CopyTable};
90use sql::statements::statement::Statement;
91use sql::statements::tql::Tql;
92use sqlparser::ast::ObjectName;
93pub use standalone::StandaloneDatanodeManager;
94use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
95use tracing::Span;
96
97use crate::error::{
98    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
99    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
100    StatementTimeoutSnafu, TableOperationSnafu,
101};
102use crate::stream_wrapper::CancellableStreamWrapper;
103
104lazy_static! {
105    static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
106}
107
108/// The frontend instance contains necessary components, and implements many
109/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
110/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
111#[derive(Clone)]
112pub struct Instance {
113    catalog_manager: CatalogManagerRef,
114    pipeline_operator: Arc<PipelineOperator>,
115    statement_executor: Arc<StatementExecutor>,
116    query_engine: QueryEngineRef,
117    plugins: Plugins,
118    inserter: InserterRef,
119    deleter: DeleterRef,
120    table_metadata_manager: TableMetadataManagerRef,
121    event_recorder: Option<EventRecorderRef>,
122    process_manager: ProcessManagerRef,
123    slow_query_options: SlowQueryOptions,
124    suspend: Arc<AtomicBool>,
125
126    // cache for otlp metrics
127    // first layer key: db-string
128    // key: direct input metric name
129    // value: if runs in legacy mode
130    otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
131}
132
133impl Instance {
134    pub fn catalog_manager(&self) -> &CatalogManagerRef {
135        &self.catalog_manager
136    }
137
138    pub fn query_engine(&self) -> &QueryEngineRef {
139        &self.query_engine
140    }
141
142    pub fn plugins(&self) -> &Plugins {
143        &self.plugins
144    }
145
146    pub fn statement_executor(&self) -> &StatementExecutorRef {
147        &self.statement_executor
148    }
149
150    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
151        &self.table_metadata_manager
152    }
153
154    pub fn inserter(&self) -> &InserterRef {
155        &self.inserter
156    }
157
158    pub fn process_manager(&self) -> &ProcessManagerRef {
159        &self.process_manager
160    }
161
162    pub fn node_manager(&self) -> &NodeManagerRef {
163        self.inserter.node_manager()
164    }
165
166    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
167        self.inserter.partition_manager()
168    }
169
170    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
171        self.statement_executor.cache_invalidator()
172    }
173
174    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
175        self.statement_executor.procedure_executor()
176    }
177
178    pub fn suspend_state(&self) -> Arc<AtomicBool> {
179        self.suspend.clone()
180    }
181
182    pub(crate) fn is_suspended(&self) -> bool {
183        self.suspend.load(atomic::Ordering::Relaxed)
184    }
185}
186
187fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
188    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
189}
190
191impl Instance {
192    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
193        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
194
195        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
196        let query_interceptor = query_interceptor.as_ref();
197
198        if stmt.is_readonly() {
199            let slow_query_timer = self
200                .slow_query_options
201                .enable
202                .then(|| self.event_recorder.clone())
203                .flatten()
204                .map(|event_recorder| {
205                    SlowQueryTimer::new(
206                        CatalogQueryStatement::Sql(stmt.clone()),
207                        self.slow_query_options.threshold,
208                        self.slow_query_options.sample_ratio,
209                        self.slow_query_options.record_type,
210                        event_recorder,
211                    )
212                });
213
214            let ticket = self.process_manager.register_query(
215                query_ctx.current_catalog().to_string(),
216                vec![query_ctx.current_schema()],
217                stmt.to_string(),
218                query_ctx.conn_info().to_string(),
219                Some(query_ctx.process_id()),
220                slow_query_timer,
221            );
222
223            let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
224
225            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
226                .await
227                .map_err(|_| error::CancelledSnafu.build())?
228                .map(|output| {
229                    let Output { meta, data } = output;
230
231                    let data = match data {
232                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
233                            CancellableStreamWrapper::new(stream, ticket),
234                        )),
235                        other => other,
236                    };
237                    Output { data, meta }
238                })
239        } else {
240            self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
241                .await
242        }
243    }
244
245    async fn exec_statement_with_timeout(
246        &self,
247        stmt: Statement,
248        query_ctx: QueryContextRef,
249        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
250    ) -> Result<Output> {
251        let timeout = derive_timeout(&stmt, &query_ctx);
252        match timeout {
253            Some(timeout) => {
254                let start = tokio::time::Instant::now();
255                let output = tokio::time::timeout(
256                    timeout,
257                    self.exec_statement(stmt, query_ctx, query_interceptor),
258                )
259                .await
260                .map_err(|_| StatementTimeoutSnafu.build())??;
261                // compute remaining timeout
262                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
263                attach_timeout(output, remaining_timeout)
264            }
265            None => {
266                self.exec_statement(stmt, query_ctx, query_interceptor)
267                    .await
268            }
269        }
270    }
271
272    async fn exec_statement(
273        &self,
274        stmt: Statement,
275        query_ctx: QueryContextRef,
276        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
277    ) -> Result<Output> {
278        match stmt {
279            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
280                // TODO: remove this when format is supported in datafusion
281                if let Statement::Explain(explain) = &stmt
282                    && let Some(format) = explain.format()
283                {
284                    query_ctx.set_explain_format(format.to_string());
285                }
286
287                self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
288                    .await
289            }
290            Statement::Tql(tql) => {
291                self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
292                    .await
293            }
294            _ => {
295                query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
296                self.statement_executor
297                    .execute_sql(stmt, query_ctx)
298                    .await
299                    .context(TableOperationSnafu)
300            }
301        }
302    }
303
304    async fn plan_and_exec_sql(
305        &self,
306        stmt: Statement,
307        query_ctx: &QueryContextRef,
308        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
309    ) -> Result<Output> {
310        let stmt = QueryStatement::Sql(stmt);
311        let plan = self
312            .statement_executor
313            .plan(&stmt, query_ctx.clone())
314            .await?;
315        let QueryStatement::Sql(stmt) = stmt else {
316            unreachable!()
317        };
318        query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
319
320        self.statement_executor
321            .exec_plan(plan, query_ctx.clone())
322            .await
323            .context(TableOperationSnafu)
324    }
325
326    async fn plan_and_exec_tql(
327        &self,
328        query_ctx: &QueryContextRef,
329        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
330        tql: Tql,
331    ) -> Result<Output> {
332        let plan = self
333            .statement_executor
334            .plan_tql(tql.clone(), query_ctx)
335            .await?;
336        query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
337        self.statement_executor
338            .exec_plan(plan, query_ctx.clone())
339            .await
340            .context(TableOperationSnafu)
341    }
342
343    async fn check_otlp_legacy(
344        &self,
345        names: &[&String],
346        ctx: QueryContextRef,
347    ) -> server_error::Result<bool> {
348        let db_string = ctx.get_db_string();
349        // fast cache check
350        let cache = self
351            .otlp_metrics_table_legacy_cache
352            .entry(db_string.clone())
353            .or_default();
354        if let Some(flag) = fast_legacy_check(&cache, names)? {
355            return Ok(flag);
356        }
357        // release cache reference to avoid lock contention
358        drop(cache);
359
360        let catalog = ctx.current_catalog();
361        let schema = ctx.current_schema();
362
363        // query legacy table names
364        let normalized_names = names
365            .iter()
366            .map(|n| legacy_normalize_otlp_name(n))
367            .collect::<Vec<_>>();
368        let table_names = normalized_names
369            .iter()
370            .map(|n| TableNameKey::new(catalog, &schema, n))
371            .collect::<Vec<_>>();
372        let table_values = self
373            .table_metadata_manager()
374            .table_name_manager()
375            .batch_get(table_names)
376            .await
377            .context(CommonMetaSnafu)?;
378        let table_ids = table_values
379            .into_iter()
380            .filter_map(|v| v.map(|vi| vi.table_id()))
381            .collect::<Vec<_>>();
382
383        // means no existing table is found, use new mode
384        if table_ids.is_empty() {
385            let cache = self
386                .otlp_metrics_table_legacy_cache
387                .entry(db_string)
388                .or_default();
389            names.iter().for_each(|name| {
390                cache.insert((*name).clone(), false);
391            });
392            return Ok(false);
393        }
394
395        // has existing table, check table options
396        let table_infos = self
397            .table_metadata_manager()
398            .table_info_manager()
399            .batch_get(&table_ids)
400            .await
401            .context(CommonMetaSnafu)?;
402        let options = table_infos
403            .values()
404            .map(|info| {
405                info.table_info
406                    .meta
407                    .options
408                    .extra_options
409                    .get(OTLP_METRIC_COMPAT_KEY)
410                    .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
411            })
412            .collect::<Vec<_>>();
413        let cache = self
414            .otlp_metrics_table_legacy_cache
415            .entry(db_string)
416            .or_default();
417        if !options.is_empty() {
418            // check value consistency
419            let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
420            let has_legacy = options
421                .iter()
422                .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
423            ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
424            let flag = has_legacy;
425            names.iter().for_each(|name| {
426                cache.insert((*name).clone(), flag);
427            });
428            Ok(flag)
429        } else {
430            // no table info, use new mode
431            names.iter().for_each(|name| {
432                cache.insert((*name).clone(), false);
433            });
434            Ok(false)
435        }
436    }
437}
438
439fn fast_legacy_check(
440    cache: &DashMap<String, bool>,
441    names: &[&String],
442) -> server_error::Result<Option<bool>> {
443    let hit_cache = names
444        .iter()
445        .filter_map(|name| cache.get(*name))
446        .collect::<Vec<_>>();
447    if !hit_cache.is_empty() {
448        let hit_legacy = hit_cache.iter().any(|en| *en.value());
449        let hit_prom = hit_cache.iter().any(|en| !*en.value());
450
451        // hit but have true and false, means both legacy and new mode are used
452        // we cannot handle this case, so return error
453        // add doc links in err msg later
454        ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
455
456        let flag = hit_legacy;
457        // drop hit_cache to release references before inserting to avoid deadlock
458        drop(hit_cache);
459
460        // set cache for all names
461        names.iter().for_each(|name| {
462            if !cache.contains_key(*name) {
463                cache.insert((*name).clone(), flag);
464            }
465        });
466        Ok(Some(flag))
467    } else {
468        Ok(None)
469    }
470}
471
472/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
473/// For MySQL, it applies only to read-only statements.
474fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
475    let query_timeout = query_ctx.query_timeout()?;
476    if query_timeout.is_zero() {
477        return None;
478    }
479    match query_ctx.channel() {
480        Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
481        Channel::Postgres => Some(query_timeout),
482        _ => None,
483    }
484}
485
486/// Derives timeout for plan execution.
487fn derive_timeout_for_plan(plan: &LogicalPlan, query_ctx: &QueryContextRef) -> Option<Duration> {
488    let query_timeout = query_ctx.query_timeout()?;
489    if query_timeout.is_zero() {
490        return None;
491    }
492    match query_ctx.channel() {
493        Channel::Mysql if is_readonly_plan(plan) => Some(query_timeout),
494        Channel::Postgres => Some(query_timeout),
495        _ => None,
496    }
497}
498
499fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
500    if timeout.is_zero() {
501        return StatementTimeoutSnafu.fail();
502    }
503
504    let output = match output.data {
505        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
506        OutputData::Stream(mut stream) => {
507            let schema = stream.schema();
508            let s = Box::pin(stream! {
509                let mut start = tokio::time::Instant::now();
510                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
511                    yield item;
512
513                    let now = tokio::time::Instant::now();
514                    timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
515                    start = now;
516                    // tokio::time::timeout may not return an error immediately when timeout is 0.
517                    if timeout.is_zero() {
518                        StreamTimeoutSnafu.fail()?;
519                    }
520                }
521            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
522            let stream = RecordBatchStreamWrapper {
523                schema,
524                stream: s,
525                output_ordering: None,
526                metrics: Default::default(),
527                span: Span::current(),
528            };
529            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
530        }
531    };
532
533    Ok(output)
534}
535
536impl Instance {
537    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")]
538    async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
539        if self.is_suspended() {
540            return vec![error::SuspendedSnafu {}.fail()];
541        }
542
543        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
544        let query_interceptor = query_interceptor_opt.as_ref();
545        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
546            Ok(q) => q,
547            Err(e) => return vec![Err(e)],
548        };
549
550        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
551        let checker = checker_ref.as_ref();
552
553        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
554            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
555        {
556            Ok(stmts) => {
557                if stmts.is_empty() {
558                    return vec![
559                        InvalidSqlSnafu {
560                            err_msg: "empty statements",
561                        }
562                        .fail(),
563                    ];
564                }
565
566                let mut results = Vec::with_capacity(stmts.len());
567                for stmt in stmts {
568                    if let Err(e) = checker
569                        .check_permission(
570                            query_ctx.current_user(),
571                            PermissionReq::SqlStatement(&stmt),
572                        )
573                        .context(PermissionSnafu)
574                    {
575                        results.push(Err(e));
576                        break;
577                    }
578
579                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
580                        Ok(output) => {
581                            let output_result =
582                                query_interceptor.post_execute(output, query_ctx.clone());
583                            results.push(output_result);
584                        }
585                        Err(e) => {
586                            if e.status_code().should_log_error() {
587                                error!(e; "Failed to execute query: {stmt}");
588                            } else {
589                                debug!("Failed to execute query: {stmt}, {e}");
590                            }
591                            results.push(Err(e));
592                            break;
593                        }
594                    }
595                }
596                results
597            }
598            Err(e) => {
599                vec![Err(e)]
600            }
601        }
602    }
603
604    async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
605        self.query_engine
606            .execute(plan, query_ctx)
607            .await
608            .context(ExecLogicalPlanSnafu)
609    }
610
611    async fn exec_plan_with_timeout(
612        &self,
613        plan: LogicalPlan,
614        query_ctx: QueryContextRef,
615    ) -> Result<Output> {
616        let timeout = derive_timeout_for_plan(&plan, &query_ctx);
617        match timeout {
618            Some(timeout) => {
619                let start = tokio::time::Instant::now();
620                let output = tokio::time::timeout(timeout, self.exec_plan(plan, query_ctx))
621                    .await
622                    .map_err(|_| StatementTimeoutSnafu.build())??;
623                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
624                attach_timeout(output, remaining_timeout)
625            }
626            None => self.exec_plan(plan, query_ctx).await,
627        }
628    }
629
630    async fn do_exec_plan_inner(
631        &self,
632        plan: LogicalPlan,
633        stmt: Option<Statement>,
634        query_ctx: QueryContextRef,
635    ) -> Result<Output> {
636        ensure!(!self.is_suspended(), error::SuspendedSnafu);
637
638        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
639        let query_interceptor = query_interceptor_opt.as_ref();
640
641        if let Some(ref s) = stmt {
642            query_interceptor.pre_execute(s, Some(&plan), query_ctx.clone())?;
643        }
644
645        let query = stmt
646            .as_ref()
647            .map(|s| s.to_string())
648            .unwrap_or_else(|| plan.display_indent().to_string());
649
650        let result = if is_readonly_plan(&plan) {
651            let slow_query_timer = self
652                .slow_query_options
653                .enable
654                .then(|| self.event_recorder.clone())
655                .flatten()
656                .map(|event_recorder| {
657                    SlowQueryTimer::new(
658                        CatalogQueryStatement::Plan(query.clone()),
659                        self.slow_query_options.threshold,
660                        self.slow_query_options.sample_ratio,
661                        self.slow_query_options.record_type,
662                        event_recorder,
663                    )
664                });
665
666            let ticket = self.process_manager.register_query(
667                query_ctx.current_catalog().to_string(),
668                vec![query_ctx.current_schema()],
669                query,
670                query_ctx.conn_info().to_string(),
671                Some(query_ctx.process_id()),
672                slow_query_timer,
673            );
674
675            let query_fut = self.exec_plan_with_timeout(plan, query_ctx.clone());
676
677            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
678                .await
679                .map_err(|_| error::CancelledSnafu.build())?
680                .map(|output| {
681                    let Output { meta, data } = output;
682
683                    let data = match data {
684                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
685                            CancellableStreamWrapper::new(stream, ticket),
686                        )),
687                        other => other,
688                    };
689                    Output { data, meta }
690                })
691        } else {
692            self.exec_plan_with_timeout(plan, query_ctx.clone()).await
693        };
694
695        result.and_then(|output| query_interceptor.post_execute(output, query_ctx))
696    }
697
698    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")]
699    async fn do_promql_query_inner(
700        &self,
701        query: &PromQuery,
702        query_ctx: QueryContextRef,
703    ) -> Vec<Result<Output>> {
704        if self.is_suspended() {
705            return vec![error::SuspendedSnafu {}.fail()];
706        }
707
708        // check will be done in prometheus handler's do_query
709        let result = PrometheusHandler::do_query(self, query, query_ctx)
710            .await
711            .with_context(|_| ExecutePromqlSnafu {
712                query: format!("{query:?}"),
713            });
714        vec![result]
715    }
716
717    async fn do_describe_inner(
718        &self,
719        stmt: Statement,
720        query_ctx: QueryContextRef,
721    ) -> Result<Option<DescribeResult>> {
722        ensure!(!self.is_suspended(), error::SuspendedSnafu);
723
724        // EXPLAIN / EXPLAIN ANALYZE wrap an inner statement; describe them when the
725        // wrapped statement is something we already plan (so that bind parameters
726        // in the inner query get their types inferred). See #8029.
727        let is_inner_plannable = |s: &Statement| {
728            matches!(
729                s,
730                Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
731            )
732        };
733        let plannable = is_inner_plannable(&stmt)
734            || matches!(&stmt, Statement::Explain(explain) if is_inner_plannable(explain.statement.as_ref()));
735
736        if plannable {
737            self.plugins
738                .get::<PermissionCheckerRef>()
739                .as_ref()
740                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
741                .context(PermissionSnafu)?;
742
743            let plan = self
744                .query_engine
745                .planner()
746                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
747                .await
748                .context(PlanStatementSnafu)?;
749            self.query_engine
750                .describe(plan, query_ctx)
751                .await
752                .map(Some)
753                .context(error::DescribeStatementSnafu)
754        } else {
755            Ok(None)
756        }
757    }
758
759    async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result<bool> {
760        self.catalog_manager
761            .schema_exists(catalog, schema, None)
762            .await
763            .context(error::CatalogSnafu)
764    }
765}
766
767#[async_trait]
768impl SqlQueryHandler for Instance {
769    async fn do_query(
770        &self,
771        query: &str,
772        query_ctx: QueryContextRef,
773    ) -> Vec<server_error::Result<Output>> {
774        self.do_query_inner(query, query_ctx)
775            .await
776            .into_iter()
777            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
778            .collect()
779    }
780
781    async fn do_exec_plan(
782        &self,
783        plan: LogicalPlan,
784        stmt: Option<Statement>,
785        query_ctx: QueryContextRef,
786    ) -> server_error::Result<Output> {
787        self.do_exec_plan_inner(plan, stmt, query_ctx)
788            .await
789            .map_err(BoxedError::new)
790            .context(server_error::ExecutePlanSnafu)
791    }
792
793    async fn do_promql_query(
794        &self,
795        query: &PromQuery,
796        query_ctx: QueryContextRef,
797    ) -> Vec<server_error::Result<Output>> {
798        self.do_promql_query_inner(query, query_ctx)
799            .await
800            .into_iter()
801            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
802            .collect()
803    }
804
805    async fn do_describe(
806        &self,
807        stmt: Statement,
808        query_ctx: QueryContextRef,
809    ) -> server_error::Result<Option<DescribeResult>> {
810        self.do_describe_inner(stmt, query_ctx)
811            .await
812            .map_err(BoxedError::new)
813            .context(server_error::DescribeStatementSnafu)
814    }
815
816    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
817        self.is_valid_schema_inner(catalog, schema)
818            .await
819            .map_err(BoxedError::new)
820            .context(server_error::CheckDatabaseValiditySnafu)
821    }
822}
823
824/// Attaches a timer to the output and observes it once the output is exhausted.
825pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
826    match output.data {
827        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
828        OutputData::Stream(stream) => {
829            let stream = OnDone::new(stream, move || {
830                timer.observe_duration();
831            });
832            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
833        }
834    }
835}
836
837#[async_trait]
838impl PrometheusHandler for Instance {
839    #[tracing::instrument(skip_all)]
840    async fn do_query(
841        &self,
842        query: &PromQuery,
843        query_ctx: QueryContextRef,
844    ) -> server_error::Result<Output> {
845        let interceptor = self
846            .plugins
847            .get::<PromQueryInterceptorRef<server_error::Error>>();
848
849        self.plugins
850            .get::<PermissionCheckerRef>()
851            .as_ref()
852            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
853            .context(AuthSnafu)?;
854
855        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
856            ParsePromQLSnafu {
857                query: query.clone(),
858            }
859        })?;
860
861        let plan = self
862            .statement_executor
863            .plan(&stmt, query_ctx.clone())
864            .await
865            .map_err(BoxedError::new)
866            .context(ExecuteQuerySnafu)?;
867
868        interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
869
870        // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
871        let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
872            CatalogQueryStatement::Promql(eval_stmt, alias)
873        } else {
874            // It should not happen since the query is already parsed successfully.
875            return UnexpectedResultSnafu {
876                reason: "The query should always be promql.".to_string(),
877            }
878            .fail();
879        };
880        let query = query_statement.to_string();
881
882        let slow_query_timer = self
883            .slow_query_options
884            .enable
885            .then(|| self.event_recorder.clone())
886            .flatten()
887            .map(|event_recorder| {
888                SlowQueryTimer::new(
889                    query_statement,
890                    self.slow_query_options.threshold,
891                    self.slow_query_options.sample_ratio,
892                    self.slow_query_options.record_type,
893                    event_recorder,
894                )
895            });
896
897        let ticket = self.process_manager.register_query(
898            query_ctx.current_catalog().to_string(),
899            vec![query_ctx.current_schema()],
900            query,
901            query_ctx.conn_info().to_string(),
902            Some(query_ctx.process_id()),
903            slow_query_timer,
904        );
905
906        let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
907
908        let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
909            .await
910            .map_err(|_| servers::error::CancelledSnafu.build())?
911            .map(|output| {
912                let Output { meta, data } = output;
913                let data = match data {
914                    OutputData::Stream(stream) => {
915                        OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
916                    }
917                    other => other,
918                };
919                Output { data, meta }
920            })
921            .map_err(BoxedError::new)
922            .context(ExecuteQuerySnafu)?;
923
924        Ok(interceptor.post_execute(output, query_ctx)?)
925    }
926
927    async fn query_metric_names(
928        &self,
929        matchers: Vec<Matcher>,
930        ctx: &QueryContextRef,
931    ) -> server_error::Result<Vec<String>> {
932        self.handle_query_metric_names(matchers, ctx)
933            .await
934            .map_err(BoxedError::new)
935            .context(ExecuteQuerySnafu)
936    }
937
938    async fn query_label_values(
939        &self,
940        metric: String,
941        label_name: String,
942        matchers: Vec<Matcher>,
943        start: SystemTime,
944        end: SystemTime,
945        ctx: &QueryContextRef,
946    ) -> server_error::Result<Vec<String>> {
947        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
948            .await
949            .map_err(BoxedError::new)
950            .context(ExecuteQuerySnafu)
951    }
952
953    fn catalog_manager(&self) -> CatalogManagerRef {
954        self.catalog_manager.clone()
955    }
956}
957
958/// Validate `stmt.database` permission if it's presented.
959macro_rules! validate_db_permission {
960    ($stmt: expr, $query_ctx: expr) => {
961        if let Some(database) = &$stmt.database {
962            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
963                .map_err(BoxedError::new)
964                .context(SqlExecInterceptedSnafu)?;
965        }
966    };
967}
968
969pub fn check_permission(
970    plugins: Plugins,
971    stmt: &Statement,
972    query_ctx: &QueryContextRef,
973) -> Result<()> {
974    let need_validate = plugins
975        .get::<QueryOptions>()
976        .map(|opts| opts.disallow_cross_catalog_query)
977        .unwrap_or_default();
978
979    if !need_validate {
980        return Ok(());
981    }
982
983    match stmt {
984        // Will be checked in execution.
985        // TODO(dennis): add a hook for admin commands.
986        Statement::Admin(_) => {}
987        // These are executed by query engine, and will be checked there.
988        Statement::Query(_)
989        | Statement::Explain(_)
990        | Statement::Tql(_)
991        | Statement::Delete(_)
992        | Statement::DeclareCursor(_)
993        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
994        // database ops won't be checked
995        Statement::CreateDatabase(_)
996        | Statement::ShowDatabases(_)
997        | Statement::DropDatabase(_)
998        | Statement::AlterDatabase(_)
999        | Statement::DropFlow(_)
1000        | Statement::Use(_) => {}
1001        #[cfg(feature = "enterprise")]
1002        Statement::DropTrigger(_) => {}
1003        Statement::ShowCreateDatabase(stmt) => {
1004            validate_database(&stmt.database_name, query_ctx)?;
1005        }
1006        Statement::ShowCreateTable(stmt) => {
1007            validate_param(&stmt.table_name, query_ctx)?;
1008        }
1009        Statement::ShowCreateFlow(stmt) => {
1010            validate_flow(&stmt.flow_name, query_ctx)?;
1011        }
1012        #[cfg(feature = "enterprise")]
1013        Statement::ShowCreateTrigger(stmt) => {
1014            validate_param(&stmt.trigger_name, query_ctx)?;
1015        }
1016        Statement::ShowCreateView(stmt) => {
1017            validate_param(&stmt.view_name, query_ctx)?;
1018        }
1019        Statement::CreateExternalTable(stmt) => {
1020            validate_param(&stmt.name, query_ctx)?;
1021        }
1022        Statement::CreateFlow(stmt) => {
1023            // TODO: should also validate source table name here?
1024            validate_param(&stmt.sink_table_name, query_ctx)?;
1025        }
1026        #[cfg(feature = "enterprise")]
1027        Statement::CreateTrigger(stmt) => {
1028            validate_param(&stmt.trigger_name, query_ctx)?;
1029        }
1030        Statement::CreateView(stmt) => {
1031            validate_param(&stmt.name, query_ctx)?;
1032        }
1033        Statement::AlterTable(stmt) => {
1034            validate_param(stmt.table_name(), query_ctx)?;
1035        }
1036        #[cfg(feature = "enterprise")]
1037        Statement::AlterTrigger(_) => {}
1038        // set/show variable now only alter/show variable in session
1039        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
1040        // show charset and show collation won't be checked
1041        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
1042
1043        Statement::Comment(comment) => match &comment.object {
1044            CommentObject::Table(table) => validate_param(table, query_ctx)?,
1045            CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
1046            CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
1047        },
1048
1049        Statement::Insert(insert) => {
1050            let name = insert.table_name().context(ParseSqlSnafu)?;
1051            validate_param(name, query_ctx)?;
1052        }
1053        Statement::CreateTable(stmt) => {
1054            validate_param(&stmt.name, query_ctx)?;
1055        }
1056        Statement::CreateTableLike(stmt) => {
1057            validate_param(&stmt.table_name, query_ctx)?;
1058            validate_param(&stmt.source_name, query_ctx)?;
1059        }
1060        Statement::DropTable(drop_stmt) => {
1061            for table_name in drop_stmt.table_names() {
1062                validate_param(table_name, query_ctx)?;
1063            }
1064        }
1065        Statement::DropView(stmt) => {
1066            validate_param(&stmt.view_name, query_ctx)?;
1067        }
1068        Statement::ShowTables(stmt) => {
1069            validate_db_permission!(stmt, query_ctx);
1070        }
1071        Statement::ShowTableStatus(stmt) => {
1072            validate_db_permission!(stmt, query_ctx);
1073        }
1074        Statement::ShowColumns(stmt) => {
1075            validate_db_permission!(stmt, query_ctx);
1076        }
1077        Statement::ShowIndex(stmt) => {
1078            validate_db_permission!(stmt, query_ctx);
1079        }
1080        Statement::ShowRegion(stmt) => {
1081            validate_db_permission!(stmt, query_ctx);
1082        }
1083        Statement::ShowViews(stmt) => {
1084            validate_db_permission!(stmt, query_ctx);
1085        }
1086        Statement::ShowFlows(stmt) => {
1087            validate_db_permission!(stmt, query_ctx);
1088        }
1089        #[cfg(feature = "enterprise")]
1090        Statement::ShowTriggers(_stmt) => {
1091            // The trigger is organized based on the catalog dimension, so there
1092            // is no need to check the permission of the database(schema).
1093        }
1094        Statement::ShowStatus(_stmt) => {}
1095        Statement::ShowSearchPath(_stmt) => {}
1096        Statement::DescribeTable(stmt) => {
1097            validate_param(stmt.name(), query_ctx)?;
1098        }
1099        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1100            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
1101            CopyTable::From(copy_table_from) => {
1102                validate_param(&copy_table_from.table_name, query_ctx)?
1103            }
1104        },
1105        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1106            match copy_database {
1107                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1108                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1109            }
1110        }
1111        Statement::TruncateTable(stmt) => {
1112            validate_param(stmt.table_name(), query_ctx)?;
1113        }
1114        // cursor operations are always allowed once it's created
1115        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1116        // User can only kill process in their own catalog.
1117        Statement::Kill(_) => {}
1118        // SHOW PROCESSLIST
1119        Statement::ShowProcesslist(_) => {}
1120    }
1121    Ok(())
1122}
1123
1124fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1125    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1126        .map_err(BoxedError::new)
1127        .context(ExternalSnafu)?;
1128
1129    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1130        .map_err(BoxedError::new)
1131        .context(SqlExecInterceptedSnafu)
1132}
1133
1134fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1135    let catalog = match &name.0[..] {
1136        [_flow] => query_ctx.current_catalog().to_string(),
1137        [catalog, _flow] => catalog.to_string_unquoted(),
1138        _ => {
1139            return InvalidSqlSnafu {
1140                err_msg: format!(
1141                    "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1142                ),
1143            }
1144            .fail();
1145        }
1146    };
1147
1148    let schema = query_ctx.current_schema();
1149
1150    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1151        .map_err(BoxedError::new)
1152        .context(SqlExecInterceptedSnafu)
1153}
1154
1155fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1156    let (catalog, schema) = match &name.0[..] {
1157        [schema] => (
1158            query_ctx.current_catalog().to_string(),
1159            schema.to_string_unquoted(),
1160        ),
1161        [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1162        _ => InvalidSqlSnafu {
1163            err_msg: format!(
1164                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1165            ),
1166        }
1167        .fail()?,
1168    };
1169
1170    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1171        .map_err(BoxedError::new)
1172        .context(SqlExecInterceptedSnafu)
1173}
1174
1175fn is_readonly_plan(plan: &LogicalPlan) -> bool {
1176    !matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Ddl(_))
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181    use std::collections::HashMap;
1182    use std::sync::atomic::{AtomicBool, Ordering};
1183    use std::sync::{Arc, Barrier};
1184    use std::thread;
1185    use std::time::{Duration, Instant};
1186
1187    use common_base::Plugins;
1188    use query::query_engine::options::QueryOptions;
1189    use session::context::QueryContext;
1190    use sql::dialect::GreptimeDbDialect;
1191    use strfmt::Format;
1192
1193    use super::*;
1194
1195    #[test]
1196    fn test_fast_legacy_check_deadlock_prevention() {
1197        // Create a DashMap to simulate the cache
1198        let cache = DashMap::new();
1199
1200        // Pre-populate cache with some entries
1201        cache.insert("metric1".to_string(), true); // legacy mode
1202        cache.insert("metric2".to_string(), false); // prom mode
1203        cache.insert("metric3".to_string(), true); // legacy mode
1204
1205        // Test case 1: Normal operation with cache hits
1206        let metric1 = "metric1".to_string();
1207        let metric4 = "metric4".to_string();
1208        let names1 = vec![&metric1, &metric4];
1209        let result = fast_legacy_check(&cache, &names1);
1210        assert!(result.is_ok());
1211        assert_eq!(result.unwrap(), Some(true)); // should return legacy mode
1212
1213        // Verify that metric4 was added to cache
1214        assert!(cache.contains_key("metric4"));
1215        assert!(*cache.get("metric4").unwrap().value());
1216
1217        // Test case 2: No cache hits
1218        let metric5 = "metric5".to_string();
1219        let metric6 = "metric6".to_string();
1220        let names2 = vec![&metric5, &metric6];
1221        let result = fast_legacy_check(&cache, &names2);
1222        assert!(result.is_ok());
1223        assert_eq!(result.unwrap(), None); // should return None as no cache hits
1224
1225        // Test case 3: Incompatible modes should return error
1226        let cache_incompatible = DashMap::new();
1227        cache_incompatible.insert("metric1".to_string(), true); // legacy
1228        cache_incompatible.insert("metric2".to_string(), false); // prom
1229        let metric1_test = "metric1".to_string();
1230        let metric2_test = "metric2".to_string();
1231        let names3 = vec![&metric1_test, &metric2_test];
1232        let result = fast_legacy_check(&cache_incompatible, &names3);
1233        assert!(result.is_err()); // should error due to incompatible modes
1234
1235        // Test case 4: Intensive concurrent access to test deadlock prevention
1236        // This test specifically targets the scenario where multiple threads
1237        // access the same cache entries simultaneously
1238        let cache_concurrent = Arc::new(DashMap::new());
1239        cache_concurrent.insert("shared_metric".to_string(), true);
1240
1241        let num_threads = 8;
1242        let operations_per_thread = 100;
1243        let barrier = Arc::new(Barrier::new(num_threads));
1244        let success_flag = Arc::new(AtomicBool::new(true));
1245
1246        let handles: Vec<_> = (0..num_threads)
1247            .map(|thread_id| {
1248                let cache_clone = Arc::clone(&cache_concurrent);
1249                let barrier_clone = Arc::clone(&barrier);
1250                let success_flag_clone = Arc::clone(&success_flag);
1251
1252                thread::spawn(move || {
1253                    // Wait for all threads to be ready
1254                    barrier_clone.wait();
1255
1256                    let start_time = Instant::now();
1257                    for i in 0..operations_per_thread {
1258                        // Each operation references existing cache entry and adds new ones
1259                        let shared_metric = "shared_metric".to_string();
1260                        let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1261                        let names = vec![&shared_metric, &new_metric];
1262
1263                        match fast_legacy_check(&cache_clone, &names) {
1264                            Ok(_) => {}
1265                            Err(_) => {
1266                                success_flag_clone.store(false, Ordering::Relaxed);
1267                                return;
1268                            }
1269                        }
1270
1271                        // If the test takes too long, it likely means deadlock
1272                        if start_time.elapsed() > Duration::from_secs(10) {
1273                            success_flag_clone.store(false, Ordering::Relaxed);
1274                            return;
1275                        }
1276                    }
1277                })
1278            })
1279            .collect();
1280
1281        // Join all threads with timeout
1282        let start_time = Instant::now();
1283        for (i, handle) in handles.into_iter().enumerate() {
1284            let join_result = handle.join();
1285
1286            // Check if we're taking too long (potential deadlock)
1287            if start_time.elapsed() > Duration::from_secs(30) {
1288                panic!("Test timed out - possible deadlock detected!");
1289            }
1290
1291            if join_result.is_err() {
1292                panic!("Thread {} panicked during execution", i);
1293            }
1294        }
1295
1296        // Verify all operations completed successfully
1297        assert!(
1298            success_flag.load(Ordering::Relaxed),
1299            "Some operations failed"
1300        );
1301
1302        // Verify that many new entries were added (proving operations completed)
1303        let final_count = cache_concurrent.len();
1304        assert!(
1305            final_count > 1 + num_threads * operations_per_thread / 2,
1306            "Expected more cache entries, got {}",
1307            final_count
1308        );
1309    }
1310
1311    #[test]
1312    fn test_exec_validation() {
1313        let query_ctx = QueryContext::arc();
1314        let plugins: Plugins = Plugins::new();
1315        plugins.insert(QueryOptions {
1316            disallow_cross_catalog_query: true,
1317        });
1318
1319        let sql = r#"
1320        SELECT * FROM demo;
1321        EXPLAIN SELECT * FROM demo;
1322        CREATE DATABASE test_database;
1323        SHOW DATABASES;
1324        "#;
1325        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1326        assert_eq!(stmts.len(), 4);
1327        for stmt in stmts {
1328            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1329            re.unwrap();
1330        }
1331
1332        let sql = r#"
1333        SHOW CREATE TABLE demo;
1334        ALTER TABLE demo ADD COLUMN new_col INT;
1335        "#;
1336        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1337        assert_eq!(stmts.len(), 2);
1338        for stmt in stmts {
1339            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1340            re.unwrap();
1341        }
1342
1343        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1344            // test right
1345            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1346            for (catalog, schema) in right {
1347                let sql = do_fmt(template_sql, catalog, schema);
1348                do_test(&sql, plugins.clone(), query_ctx, true);
1349            }
1350
1351            let wrong = vec![
1352                ("wrongcatalog.", "public."),
1353                ("wrongcatalog.", "wrongschema."),
1354            ];
1355            for (catalog, schema) in wrong {
1356                let sql = do_fmt(template_sql, catalog, schema);
1357                do_test(&sql, plugins.clone(), query_ctx, false);
1358            }
1359        }
1360
1361        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1362            let vars = HashMap::from([
1363                ("catalog".to_string(), catalog),
1364                ("schema".to_string(), schema),
1365            ]);
1366            template.format(&vars).unwrap()
1367        }
1368
1369        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1370            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1371            let re = check_permission(plugins, stmt, query_ctx);
1372            if is_ok {
1373                re.unwrap();
1374            } else {
1375                assert!(re.is_err());
1376            }
1377        }
1378
1379        // test insert
1380        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1381        replace_test(sql, plugins.clone(), &query_ctx);
1382
1383        // test create table
1384        let sql = r#"CREATE TABLE {catalog}{schema}demo(
1385                            host STRING,
1386                            ts TIMESTAMP,
1387                            TIME INDEX (ts),
1388                            PRIMARY KEY(host)
1389                        ) engine=mito;"#;
1390        replace_test(sql, plugins.clone(), &query_ctx);
1391
1392        // test drop table
1393        let sql = "DROP TABLE {catalog}{schema}demo;";
1394        replace_test(sql, plugins.clone(), &query_ctx);
1395
1396        // test show tables
1397        let sql = "SHOW TABLES FROM public";
1398        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1399        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1400
1401        let sql = "SHOW TABLES FROM private";
1402        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1403        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1404        assert!(re.is_ok());
1405
1406        // test describe table
1407        let sql = "DESC TABLE {catalog}{schema}demo;";
1408        replace_test(sql, plugins.clone(), &query_ctx);
1409
1410        let comment_flow_cases = [
1411            ("COMMENT ON FLOW my_flow IS 'comment';", true),
1412            ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
1413            ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
1414        ];
1415        for (sql, is_ok) in comment_flow_cases {
1416            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1417            let result = check_permission(plugins.clone(), stmt, &query_ctx);
1418            assert_eq!(result.is_ok(), is_ok);
1419        }
1420
1421        let show_flow_cases = [
1422            ("SHOW CREATE FLOW my_flow;", true),
1423            ("SHOW CREATE FLOW greptime.my_flow;", true),
1424            ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
1425        ];
1426        for (sql, is_ok) in show_flow_cases {
1427            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1428            let result = check_permission(plugins.clone(), stmt, &query_ctx);
1429            assert_eq!(result.is_ok(), is_ok);
1430        }
1431    }
1432}