frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::pin::Pin;
29use std::sync::atomic::AtomicBool;
30use std::sync::{Arc, atomic};
31use std::time::{Duration, SystemTime};
32
33use async_stream::stream;
34use async_trait::async_trait;
35use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
36use catalog::CatalogManagerRef;
37use catalog::process_manager::{
38    ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
39};
40use client::OutputData;
41use common_base::Plugins;
42use common_base::cancellation::CancellableFuture;
43use common_error::ext::{BoxedError, ErrorExt};
44use common_event_recorder::EventRecorderRef;
45use common_meta::cache_invalidator::CacheInvalidatorRef;
46use common_meta::key::TableMetadataManagerRef;
47use common_meta::key::table_name::TableNameKey;
48use common_meta::node_manager::NodeManagerRef;
49use common_meta::procedure_executor::ProcedureExecutorRef;
50use common_query::Output;
51use common_recordbatch::RecordBatchStreamWrapper;
52use common_recordbatch::error::StreamTimeoutSnafu;
53use common_telemetry::logging::SlowQueryOptions;
54use common_telemetry::{debug, error, tracing};
55use dashmap::DashMap;
56use datafusion_expr::LogicalPlan;
57use futures::{Stream, StreamExt};
58use lazy_static::lazy_static;
59use operator::delete::DeleterRef;
60use operator::insert::InserterRef;
61use operator::statement::{StatementExecutor, StatementExecutorRef};
62use partition::manager::PartitionRuleManagerRef;
63use pipeline::pipeline_operator::PipelineOperator;
64use prometheus::HistogramTimer;
65use promql_parser::label::Matcher;
66use query::QueryEngineRef;
67use query::metrics::OnDone;
68use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
69use query::query_engine::DescribeResult;
70use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
71use servers::error::{
72    self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
73    OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
74};
75use servers::interceptor::{
76    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
77};
78use servers::otlp::metrics::legacy_normalize_otlp_name;
79use servers::prometheus_handler::PrometheusHandler;
80use servers::query_handler::sql::SqlQueryHandler;
81use session::context::{Channel, QueryContextRef};
82use session::table_name::table_idents_to_full_name;
83use snafu::prelude::*;
84use sql::ast::ObjectNamePartExt;
85use sql::dialect::Dialect;
86use sql::parser::{ParseOptions, ParserContext};
87use sql::statements::comment::CommentObject;
88use sql::statements::copy::{CopyDatabase, CopyTable};
89use sql::statements::statement::Statement;
90use sql::statements::tql::Tql;
91use sqlparser::ast::ObjectName;
92pub use standalone::StandaloneDatanodeManager;
93use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
94use tracing::Span;
95
96use crate::error::{
97    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
98    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
99    StatementTimeoutSnafu, TableOperationSnafu,
100};
101use crate::stream_wrapper::CancellableStreamWrapper;
102
103lazy_static! {
104    static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
105}
106
107/// The frontend instance contains necessary components, and implements many
108/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
109/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
110#[derive(Clone)]
111pub struct Instance {
112    catalog_manager: CatalogManagerRef,
113    pipeline_operator: Arc<PipelineOperator>,
114    statement_executor: Arc<StatementExecutor>,
115    query_engine: QueryEngineRef,
116    plugins: Plugins,
117    inserter: InserterRef,
118    deleter: DeleterRef,
119    table_metadata_manager: TableMetadataManagerRef,
120    event_recorder: Option<EventRecorderRef>,
121    process_manager: ProcessManagerRef,
122    slow_query_options: SlowQueryOptions,
123    suspend: Arc<AtomicBool>,
124
125    // cache for otlp metrics
126    // first layer key: db-string
127    // key: direct input metric name
128    // value: if runs in legacy mode
129    otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
130}
131
132impl Instance {
133    pub fn catalog_manager(&self) -> &CatalogManagerRef {
134        &self.catalog_manager
135    }
136
137    pub fn query_engine(&self) -> &QueryEngineRef {
138        &self.query_engine
139    }
140
141    pub fn plugins(&self) -> &Plugins {
142        &self.plugins
143    }
144
145    pub fn statement_executor(&self) -> &StatementExecutorRef {
146        &self.statement_executor
147    }
148
149    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
150        &self.table_metadata_manager
151    }
152
153    pub fn inserter(&self) -> &InserterRef {
154        &self.inserter
155    }
156
157    pub fn process_manager(&self) -> &ProcessManagerRef {
158        &self.process_manager
159    }
160
161    pub fn node_manager(&self) -> &NodeManagerRef {
162        self.inserter.node_manager()
163    }
164
165    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
166        self.inserter.partition_manager()
167    }
168
169    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
170        self.statement_executor.cache_invalidator()
171    }
172
173    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
174        self.statement_executor.procedure_executor()
175    }
176
177    pub fn suspend_state(&self) -> Arc<AtomicBool> {
178        self.suspend.clone()
179    }
180
181    pub(crate) fn is_suspended(&self) -> bool {
182        self.suspend.load(atomic::Ordering::Relaxed)
183    }
184}
185
186fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
187    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
188}
189
190impl Instance {
191    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
192        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
193
194        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
195        let query_interceptor = query_interceptor.as_ref();
196
197        if should_capture_statement(Some(&stmt)) {
198            let slow_query_timer = self
199                .slow_query_options
200                .enable
201                .then(|| self.event_recorder.clone())
202                .flatten()
203                .map(|event_recorder| {
204                    SlowQueryTimer::new(
205                        CatalogQueryStatement::Sql(stmt.clone()),
206                        self.slow_query_options.threshold,
207                        self.slow_query_options.sample_ratio,
208                        self.slow_query_options.record_type,
209                        event_recorder,
210                    )
211                });
212
213            let ticket = self.process_manager.register_query(
214                query_ctx.current_catalog().to_string(),
215                vec![query_ctx.current_schema()],
216                stmt.to_string(),
217                query_ctx.conn_info().to_string(),
218                Some(query_ctx.process_id()),
219                slow_query_timer,
220            );
221
222            let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
223
224            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
225                .await
226                .map_err(|_| error::CancelledSnafu.build())?
227                .map(|output| {
228                    let Output { meta, data } = output;
229
230                    let data = match data {
231                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
232                            CancellableStreamWrapper::new(stream, ticket),
233                        )),
234                        other => other,
235                    };
236                    Output { data, meta }
237                })
238        } else {
239            self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
240                .await
241        }
242    }
243
244    async fn exec_statement_with_timeout(
245        &self,
246        stmt: Statement,
247        query_ctx: QueryContextRef,
248        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
249    ) -> Result<Output> {
250        let timeout = derive_timeout(&stmt, &query_ctx);
251        match timeout {
252            Some(timeout) => {
253                let start = tokio::time::Instant::now();
254                let output = tokio::time::timeout(
255                    timeout,
256                    self.exec_statement(stmt, query_ctx, query_interceptor),
257                )
258                .await
259                .map_err(|_| StatementTimeoutSnafu.build())??;
260                // compute remaining timeout
261                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
262                attach_timeout(output, remaining_timeout)
263            }
264            None => {
265                self.exec_statement(stmt, query_ctx, query_interceptor)
266                    .await
267            }
268        }
269    }
270
271    async fn exec_statement(
272        &self,
273        stmt: Statement,
274        query_ctx: QueryContextRef,
275        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
276    ) -> Result<Output> {
277        match stmt {
278            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
279                // TODO: remove this when format is supported in datafusion
280                if let Statement::Explain(explain) = &stmt
281                    && let Some(format) = explain.format()
282                {
283                    query_ctx.set_explain_format(format.to_string());
284                }
285
286                self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
287                    .await
288            }
289            Statement::Tql(tql) => {
290                self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
291                    .await
292            }
293            _ => {
294                query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
295                self.statement_executor
296                    .execute_sql(stmt, query_ctx)
297                    .await
298                    .context(TableOperationSnafu)
299            }
300        }
301    }
302
303    async fn plan_and_exec_sql(
304        &self,
305        stmt: Statement,
306        query_ctx: &QueryContextRef,
307        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
308    ) -> Result<Output> {
309        let stmt = QueryStatement::Sql(stmt);
310        let plan = self
311            .statement_executor
312            .plan(&stmt, query_ctx.clone())
313            .await?;
314        let QueryStatement::Sql(stmt) = stmt else {
315            unreachable!()
316        };
317        query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
318        self.statement_executor
319            .exec_plan(plan, query_ctx.clone())
320            .await
321            .context(TableOperationSnafu)
322    }
323
324    async fn plan_and_exec_tql(
325        &self,
326        query_ctx: &QueryContextRef,
327        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
328        tql: Tql,
329    ) -> Result<Output> {
330        let plan = self
331            .statement_executor
332            .plan_tql(tql.clone(), query_ctx)
333            .await?;
334        query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
335        self.statement_executor
336            .exec_plan(plan, query_ctx.clone())
337            .await
338            .context(TableOperationSnafu)
339    }
340
341    async fn check_otlp_legacy(
342        &self,
343        names: &[&String],
344        ctx: QueryContextRef,
345    ) -> server_error::Result<bool> {
346        let db_string = ctx.get_db_string();
347        // fast cache check
348        let cache = self
349            .otlp_metrics_table_legacy_cache
350            .entry(db_string.clone())
351            .or_default();
352        if let Some(flag) = fast_legacy_check(&cache, names)? {
353            return Ok(flag);
354        }
355        // release cache reference to avoid lock contention
356        drop(cache);
357
358        let catalog = ctx.current_catalog();
359        let schema = ctx.current_schema();
360
361        // query legacy table names
362        let normalized_names = names
363            .iter()
364            .map(|n| legacy_normalize_otlp_name(n))
365            .collect::<Vec<_>>();
366        let table_names = normalized_names
367            .iter()
368            .map(|n| TableNameKey::new(catalog, &schema, n))
369            .collect::<Vec<_>>();
370        let table_values = self
371            .table_metadata_manager()
372            .table_name_manager()
373            .batch_get(table_names)
374            .await
375            .context(CommonMetaSnafu)?;
376        let table_ids = table_values
377            .into_iter()
378            .filter_map(|v| v.map(|vi| vi.table_id()))
379            .collect::<Vec<_>>();
380
381        // means no existing table is found, use new mode
382        if table_ids.is_empty() {
383            let cache = self
384                .otlp_metrics_table_legacy_cache
385                .entry(db_string)
386                .or_default();
387            names.iter().for_each(|name| {
388                cache.insert((*name).clone(), false);
389            });
390            return Ok(false);
391        }
392
393        // has existing table, check table options
394        let table_infos = self
395            .table_metadata_manager()
396            .table_info_manager()
397            .batch_get(&table_ids)
398            .await
399            .context(CommonMetaSnafu)?;
400        let options = table_infos
401            .values()
402            .map(|info| {
403                info.table_info
404                    .meta
405                    .options
406                    .extra_options
407                    .get(OTLP_METRIC_COMPAT_KEY)
408                    .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
409            })
410            .collect::<Vec<_>>();
411        let cache = self
412            .otlp_metrics_table_legacy_cache
413            .entry(db_string)
414            .or_default();
415        if !options.is_empty() {
416            // check value consistency
417            let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
418            let has_legacy = options
419                .iter()
420                .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
421            ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
422            let flag = has_legacy;
423            names.iter().for_each(|name| {
424                cache.insert((*name).clone(), flag);
425            });
426            Ok(flag)
427        } else {
428            // no table info, use new mode
429            names.iter().for_each(|name| {
430                cache.insert((*name).clone(), false);
431            });
432            Ok(false)
433        }
434    }
435}
436
437fn fast_legacy_check(
438    cache: &DashMap<String, bool>,
439    names: &[&String],
440) -> server_error::Result<Option<bool>> {
441    let hit_cache = names
442        .iter()
443        .filter_map(|name| cache.get(*name))
444        .collect::<Vec<_>>();
445    if !hit_cache.is_empty() {
446        let hit_legacy = hit_cache.iter().any(|en| *en.value());
447        let hit_prom = hit_cache.iter().any(|en| !*en.value());
448
449        // hit but have true and false, means both legacy and new mode are used
450        // we cannot handle this case, so return error
451        // add doc links in err msg later
452        ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
453
454        let flag = hit_legacy;
455        // drop hit_cache to release references before inserting to avoid deadlock
456        drop(hit_cache);
457
458        // set cache for all names
459        names.iter().for_each(|name| {
460            if !cache.contains_key(*name) {
461                cache.insert((*name).clone(), flag);
462            }
463        });
464        Ok(Some(flag))
465    } else {
466        Ok(None)
467    }
468}
469
470/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
471/// For MySQL, it applies only to read-only statements.
472fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
473    let query_timeout = query_ctx.query_timeout()?;
474    if query_timeout.is_zero() {
475        return None;
476    }
477    match query_ctx.channel() {
478        Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
479        Channel::Postgres => Some(query_timeout),
480        _ => None,
481    }
482}
483
484fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
485    if timeout.is_zero() {
486        return StatementTimeoutSnafu.fail();
487    }
488
489    let output = match output.data {
490        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
491        OutputData::Stream(mut stream) => {
492            let schema = stream.schema();
493            let s = Box::pin(stream! {
494                let mut start = tokio::time::Instant::now();
495                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
496                    yield item;
497
498                    let now = tokio::time::Instant::now();
499                    timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
500                    start = now;
501                    // tokio::time::timeout may not return an error immediately when timeout is 0.
502                    if timeout.is_zero() {
503                        StreamTimeoutSnafu.fail()?;
504                    }
505                }
506            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
507            let stream = RecordBatchStreamWrapper {
508                schema,
509                stream: s,
510                output_ordering: None,
511                metrics: Default::default(),
512                span: Span::current(),
513            };
514            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
515        }
516    };
517
518    Ok(output)
519}
520
521impl Instance {
522    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")]
523    async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
524        if self.is_suspended() {
525            return vec![error::SuspendedSnafu {}.fail()];
526        }
527
528        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
529        let query_interceptor = query_interceptor_opt.as_ref();
530        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
531            Ok(q) => q,
532            Err(e) => return vec![Err(e)],
533        };
534
535        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
536        let checker = checker_ref.as_ref();
537
538        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
539            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
540        {
541            Ok(stmts) => {
542                if stmts.is_empty() {
543                    return vec![
544                        InvalidSqlSnafu {
545                            err_msg: "empty statements",
546                        }
547                        .fail(),
548                    ];
549                }
550
551                let mut results = Vec::with_capacity(stmts.len());
552                for stmt in stmts {
553                    if let Err(e) = checker
554                        .check_permission(
555                            query_ctx.current_user(),
556                            PermissionReq::SqlStatement(&stmt),
557                        )
558                        .context(PermissionSnafu)
559                    {
560                        results.push(Err(e));
561                        break;
562                    }
563
564                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
565                        Ok(output) => {
566                            let output_result =
567                                query_interceptor.post_execute(output, query_ctx.clone());
568                            results.push(output_result);
569                        }
570                        Err(e) => {
571                            if e.status_code().should_log_error() {
572                                error!(e; "Failed to execute query: {stmt}");
573                            } else {
574                                debug!("Failed to execute query: {stmt}, {e}");
575                            }
576                            results.push(Err(e));
577                            break;
578                        }
579                    }
580                }
581                results
582            }
583            Err(e) => {
584                vec![Err(e)]
585            }
586        }
587    }
588
589    async fn do_exec_plan_inner(
590        &self,
591        stmt: Option<Statement>,
592        plan: LogicalPlan,
593        query_ctx: QueryContextRef,
594    ) -> Result<Output> {
595        ensure!(!self.is_suspended(), error::SuspendedSnafu);
596
597        if should_capture_statement(stmt.as_ref()) {
598            // It's safe to unwrap here because we've already checked the type.
599            let stmt = stmt.unwrap();
600            let query = stmt.to_string();
601            let slow_query_timer = self
602                .slow_query_options
603                .enable
604                .then(|| self.event_recorder.clone())
605                .flatten()
606                .map(|event_recorder| {
607                    SlowQueryTimer::new(
608                        CatalogQueryStatement::Sql(stmt.clone()),
609                        self.slow_query_options.threshold,
610                        self.slow_query_options.sample_ratio,
611                        self.slow_query_options.record_type,
612                        event_recorder,
613                    )
614                });
615
616            let ticket = self.process_manager.register_query(
617                query_ctx.current_catalog().to_string(),
618                vec![query_ctx.current_schema()],
619                query,
620                query_ctx.conn_info().to_string(),
621                Some(query_ctx.process_id()),
622                slow_query_timer,
623            );
624
625            let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
626
627            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
628                .await
629                .map_err(|_| error::CancelledSnafu.build())?
630                .map(|output| {
631                    let Output { meta, data } = output;
632
633                    let data = match data {
634                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
635                            CancellableStreamWrapper::new(stream, ticket),
636                        )),
637                        other => other,
638                    };
639                    Output { data, meta }
640                })
641                .context(ExecLogicalPlanSnafu)
642        } else {
643            // plan should be prepared before exec
644            // we'll do check there
645            self.query_engine
646                .execute(plan.clone(), query_ctx)
647                .await
648                .context(ExecLogicalPlanSnafu)
649        }
650    }
651
652    #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")]
653    async fn do_promql_query_inner(
654        &self,
655        query: &PromQuery,
656        query_ctx: QueryContextRef,
657    ) -> Vec<Result<Output>> {
658        if self.is_suspended() {
659            return vec![error::SuspendedSnafu {}.fail()];
660        }
661
662        // check will be done in prometheus handler's do_query
663        let result = PrometheusHandler::do_query(self, query, query_ctx)
664            .await
665            .with_context(|_| ExecutePromqlSnafu {
666                query: format!("{query:?}"),
667            });
668        vec![result]
669    }
670
671    async fn do_describe_inner(
672        &self,
673        stmt: Statement,
674        query_ctx: QueryContextRef,
675    ) -> Result<Option<DescribeResult>> {
676        ensure!(!self.is_suspended(), error::SuspendedSnafu);
677
678        if matches!(
679            stmt,
680            Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
681        ) {
682            self.plugins
683                .get::<PermissionCheckerRef>()
684                .as_ref()
685                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
686                .context(PermissionSnafu)?;
687
688            let plan = self
689                .query_engine
690                .planner()
691                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
692                .await
693                .context(PlanStatementSnafu)?;
694            self.query_engine
695                .describe(plan, query_ctx)
696                .await
697                .map(Some)
698                .context(error::DescribeStatementSnafu)
699        } else {
700            Ok(None)
701        }
702    }
703
704    async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result<bool> {
705        self.catalog_manager
706            .schema_exists(catalog, schema, None)
707            .await
708            .context(error::CatalogSnafu)
709    }
710}
711
712#[async_trait]
713impl SqlQueryHandler for Instance {
714    async fn do_query(
715        &self,
716        query: &str,
717        query_ctx: QueryContextRef,
718    ) -> Vec<server_error::Result<Output>> {
719        self.do_query_inner(query, query_ctx)
720            .await
721            .into_iter()
722            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
723            .collect()
724    }
725
726    async fn do_exec_plan(
727        &self,
728        stmt: Option<Statement>,
729        plan: LogicalPlan,
730        query_ctx: QueryContextRef,
731    ) -> server_error::Result<Output> {
732        self.do_exec_plan_inner(stmt, plan, query_ctx)
733            .await
734            .map_err(BoxedError::new)
735            .context(server_error::ExecutePlanSnafu)
736    }
737
738    async fn do_promql_query(
739        &self,
740        query: &PromQuery,
741        query_ctx: QueryContextRef,
742    ) -> Vec<server_error::Result<Output>> {
743        self.do_promql_query_inner(query, query_ctx)
744            .await
745            .into_iter()
746            .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
747            .collect()
748    }
749
750    async fn do_describe(
751        &self,
752        stmt: Statement,
753        query_ctx: QueryContextRef,
754    ) -> server_error::Result<Option<DescribeResult>> {
755        self.do_describe_inner(stmt, query_ctx)
756            .await
757            .map_err(BoxedError::new)
758            .context(server_error::DescribeStatementSnafu)
759    }
760
761    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
762        self.is_valid_schema_inner(catalog, schema)
763            .await
764            .map_err(BoxedError::new)
765            .context(server_error::CheckDatabaseValiditySnafu)
766    }
767}
768
769/// Attaches a timer to the output and observes it once the output is exhausted.
770pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
771    match output.data {
772        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
773        OutputData::Stream(stream) => {
774            let stream = OnDone::new(stream, move || {
775                timer.observe_duration();
776            });
777            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
778        }
779    }
780}
781
782#[async_trait]
783impl PrometheusHandler for Instance {
784    #[tracing::instrument(skip_all)]
785    async fn do_query(
786        &self,
787        query: &PromQuery,
788        query_ctx: QueryContextRef,
789    ) -> server_error::Result<Output> {
790        let interceptor = self
791            .plugins
792            .get::<PromQueryInterceptorRef<server_error::Error>>();
793
794        self.plugins
795            .get::<PermissionCheckerRef>()
796            .as_ref()
797            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
798            .context(AuthSnafu)?;
799
800        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
801            ParsePromQLSnafu {
802                query: query.clone(),
803            }
804        })?;
805
806        let plan = self
807            .statement_executor
808            .plan(&stmt, query_ctx.clone())
809            .await
810            .map_err(BoxedError::new)
811            .context(ExecuteQuerySnafu)?;
812
813        interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
814
815        // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
816        let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
817            CatalogQueryStatement::Promql(eval_stmt, alias)
818        } else {
819            // It should not happen since the query is already parsed successfully.
820            return UnexpectedResultSnafu {
821                reason: "The query should always be promql.".to_string(),
822            }
823            .fail();
824        };
825        let query = query_statement.to_string();
826
827        let slow_query_timer = self
828            .slow_query_options
829            .enable
830            .then(|| self.event_recorder.clone())
831            .flatten()
832            .map(|event_recorder| {
833                SlowQueryTimer::new(
834                    query_statement,
835                    self.slow_query_options.threshold,
836                    self.slow_query_options.sample_ratio,
837                    self.slow_query_options.record_type,
838                    event_recorder,
839                )
840            });
841
842        let ticket = self.process_manager.register_query(
843            query_ctx.current_catalog().to_string(),
844            vec![query_ctx.current_schema()],
845            query,
846            query_ctx.conn_info().to_string(),
847            Some(query_ctx.process_id()),
848            slow_query_timer,
849        );
850
851        let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
852
853        let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
854            .await
855            .map_err(|_| servers::error::CancelledSnafu.build())?
856            .map(|output| {
857                let Output { meta, data } = output;
858                let data = match data {
859                    OutputData::Stream(stream) => {
860                        OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
861                    }
862                    other => other,
863                };
864                Output { data, meta }
865            })
866            .map_err(BoxedError::new)
867            .context(ExecuteQuerySnafu)?;
868
869        Ok(interceptor.post_execute(output, query_ctx)?)
870    }
871
872    async fn query_metric_names(
873        &self,
874        matchers: Vec<Matcher>,
875        ctx: &QueryContextRef,
876    ) -> server_error::Result<Vec<String>> {
877        self.handle_query_metric_names(matchers, ctx)
878            .await
879            .map_err(BoxedError::new)
880            .context(ExecuteQuerySnafu)
881    }
882
883    async fn query_label_values(
884        &self,
885        metric: String,
886        label_name: String,
887        matchers: Vec<Matcher>,
888        start: SystemTime,
889        end: SystemTime,
890        ctx: &QueryContextRef,
891    ) -> server_error::Result<Vec<String>> {
892        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
893            .await
894            .map_err(BoxedError::new)
895            .context(ExecuteQuerySnafu)
896    }
897
898    fn catalog_manager(&self) -> CatalogManagerRef {
899        self.catalog_manager.clone()
900    }
901}
902
903/// Validate `stmt.database` permission if it's presented.
904macro_rules! validate_db_permission {
905    ($stmt: expr, $query_ctx: expr) => {
906        if let Some(database) = &$stmt.database {
907            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
908                .map_err(BoxedError::new)
909                .context(SqlExecInterceptedSnafu)?;
910        }
911    };
912}
913
914pub fn check_permission(
915    plugins: Plugins,
916    stmt: &Statement,
917    query_ctx: &QueryContextRef,
918) -> Result<()> {
919    let need_validate = plugins
920        .get::<QueryOptions>()
921        .map(|opts| opts.disallow_cross_catalog_query)
922        .unwrap_or_default();
923
924    if !need_validate {
925        return Ok(());
926    }
927
928    match stmt {
929        // Will be checked in execution.
930        // TODO(dennis): add a hook for admin commands.
931        Statement::Admin(_) => {}
932        // These are executed by query engine, and will be checked there.
933        Statement::Query(_)
934        | Statement::Explain(_)
935        | Statement::Tql(_)
936        | Statement::Delete(_)
937        | Statement::DeclareCursor(_)
938        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
939        // database ops won't be checked
940        Statement::CreateDatabase(_)
941        | Statement::ShowDatabases(_)
942        | Statement::DropDatabase(_)
943        | Statement::AlterDatabase(_)
944        | Statement::DropFlow(_)
945        | Statement::Use(_) => {}
946        #[cfg(feature = "enterprise")]
947        Statement::DropTrigger(_) => {}
948        Statement::ShowCreateDatabase(stmt) => {
949            validate_database(&stmt.database_name, query_ctx)?;
950        }
951        Statement::ShowCreateTable(stmt) => {
952            validate_param(&stmt.table_name, query_ctx)?;
953        }
954        Statement::ShowCreateFlow(stmt) => {
955            validate_flow(&stmt.flow_name, query_ctx)?;
956        }
957        #[cfg(feature = "enterprise")]
958        Statement::ShowCreateTrigger(stmt) => {
959            validate_param(&stmt.trigger_name, query_ctx)?;
960        }
961        Statement::ShowCreateView(stmt) => {
962            validate_param(&stmt.view_name, query_ctx)?;
963        }
964        Statement::CreateExternalTable(stmt) => {
965            validate_param(&stmt.name, query_ctx)?;
966        }
967        Statement::CreateFlow(stmt) => {
968            // TODO: should also validate source table name here?
969            validate_param(&stmt.sink_table_name, query_ctx)?;
970        }
971        #[cfg(feature = "enterprise")]
972        Statement::CreateTrigger(stmt) => {
973            validate_param(&stmt.trigger_name, query_ctx)?;
974        }
975        Statement::CreateView(stmt) => {
976            validate_param(&stmt.name, query_ctx)?;
977        }
978        Statement::AlterTable(stmt) => {
979            validate_param(stmt.table_name(), query_ctx)?;
980        }
981        #[cfg(feature = "enterprise")]
982        Statement::AlterTrigger(_) => {}
983        // set/show variable now only alter/show variable in session
984        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
985        // show charset and show collation won't be checked
986        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
987
988        Statement::Comment(comment) => match &comment.object {
989            CommentObject::Table(table) => validate_param(table, query_ctx)?,
990            CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
991            CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
992        },
993
994        Statement::Insert(insert) => {
995            let name = insert.table_name().context(ParseSqlSnafu)?;
996            validate_param(name, query_ctx)?;
997        }
998        Statement::CreateTable(stmt) => {
999            validate_param(&stmt.name, query_ctx)?;
1000        }
1001        Statement::CreateTableLike(stmt) => {
1002            validate_param(&stmt.table_name, query_ctx)?;
1003            validate_param(&stmt.source_name, query_ctx)?;
1004        }
1005        Statement::DropTable(drop_stmt) => {
1006            for table_name in drop_stmt.table_names() {
1007                validate_param(table_name, query_ctx)?;
1008            }
1009        }
1010        Statement::DropView(stmt) => {
1011            validate_param(&stmt.view_name, query_ctx)?;
1012        }
1013        Statement::ShowTables(stmt) => {
1014            validate_db_permission!(stmt, query_ctx);
1015        }
1016        Statement::ShowTableStatus(stmt) => {
1017            validate_db_permission!(stmt, query_ctx);
1018        }
1019        Statement::ShowColumns(stmt) => {
1020            validate_db_permission!(stmt, query_ctx);
1021        }
1022        Statement::ShowIndex(stmt) => {
1023            validate_db_permission!(stmt, query_ctx);
1024        }
1025        Statement::ShowRegion(stmt) => {
1026            validate_db_permission!(stmt, query_ctx);
1027        }
1028        Statement::ShowViews(stmt) => {
1029            validate_db_permission!(stmt, query_ctx);
1030        }
1031        Statement::ShowFlows(stmt) => {
1032            validate_db_permission!(stmt, query_ctx);
1033        }
1034        #[cfg(feature = "enterprise")]
1035        Statement::ShowTriggers(_stmt) => {
1036            // The trigger is organized based on the catalog dimension, so there
1037            // is no need to check the permission of the database(schema).
1038        }
1039        Statement::ShowStatus(_stmt) => {}
1040        Statement::ShowSearchPath(_stmt) => {}
1041        Statement::DescribeTable(stmt) => {
1042            validate_param(stmt.name(), query_ctx)?;
1043        }
1044        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1045            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
1046            CopyTable::From(copy_table_from) => {
1047                validate_param(&copy_table_from.table_name, query_ctx)?
1048            }
1049        },
1050        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1051            match copy_database {
1052                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1053                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1054            }
1055        }
1056        Statement::TruncateTable(stmt) => {
1057            validate_param(stmt.table_name(), query_ctx)?;
1058        }
1059        // cursor operations are always allowed once it's created
1060        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1061        // User can only kill process in their own catalog.
1062        Statement::Kill(_) => {}
1063        // SHOW PROCESSLIST
1064        Statement::ShowProcesslist(_) => {}
1065    }
1066    Ok(())
1067}
1068
1069fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1070    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1071        .map_err(BoxedError::new)
1072        .context(ExternalSnafu)?;
1073
1074    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1075        .map_err(BoxedError::new)
1076        .context(SqlExecInterceptedSnafu)
1077}
1078
1079fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1080    let catalog = match &name.0[..] {
1081        [_flow] => query_ctx.current_catalog().to_string(),
1082        [catalog, _flow] => catalog.to_string_unquoted(),
1083        _ => {
1084            return InvalidSqlSnafu {
1085                err_msg: format!(
1086                    "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1087                ),
1088            }
1089            .fail();
1090        }
1091    };
1092
1093    let schema = query_ctx.current_schema();
1094
1095    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1096        .map_err(BoxedError::new)
1097        .context(SqlExecInterceptedSnafu)
1098}
1099
1100fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1101    let (catalog, schema) = match &name.0[..] {
1102        [schema] => (
1103            query_ctx.current_catalog().to_string(),
1104            schema.to_string_unquoted(),
1105        ),
1106        [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1107        _ => InvalidSqlSnafu {
1108            err_msg: format!(
1109                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1110            ),
1111        }
1112        .fail()?,
1113    };
1114
1115    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1116        .map_err(BoxedError::new)
1117        .context(SqlExecInterceptedSnafu)
1118}
1119
1120// Create a query ticket and slow query timer if the statement is a query or readonly statement.
1121fn should_capture_statement(stmt: Option<&Statement>) -> bool {
1122    if let Some(stmt) = stmt {
1123        matches!(stmt, Statement::Query(_)) || stmt.is_readonly()
1124    } else {
1125        false
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use std::collections::HashMap;
1132    use std::sync::atomic::{AtomicBool, Ordering};
1133    use std::sync::{Arc, Barrier};
1134    use std::thread;
1135    use std::time::{Duration, Instant};
1136
1137    use common_base::Plugins;
1138    use query::query_engine::options::QueryOptions;
1139    use session::context::QueryContext;
1140    use sql::dialect::GreptimeDbDialect;
1141    use strfmt::Format;
1142
1143    use super::*;
1144
1145    #[test]
1146    fn test_fast_legacy_check_deadlock_prevention() {
1147        // Create a DashMap to simulate the cache
1148        let cache = DashMap::new();
1149
1150        // Pre-populate cache with some entries
1151        cache.insert("metric1".to_string(), true); // legacy mode
1152        cache.insert("metric2".to_string(), false); // prom mode
1153        cache.insert("metric3".to_string(), true); // legacy mode
1154
1155        // Test case 1: Normal operation with cache hits
1156        let metric1 = "metric1".to_string();
1157        let metric4 = "metric4".to_string();
1158        let names1 = vec![&metric1, &metric4];
1159        let result = fast_legacy_check(&cache, &names1);
1160        assert!(result.is_ok());
1161        assert_eq!(result.unwrap(), Some(true)); // should return legacy mode
1162
1163        // Verify that metric4 was added to cache
1164        assert!(cache.contains_key("metric4"));
1165        assert!(*cache.get("metric4").unwrap().value());
1166
1167        // Test case 2: No cache hits
1168        let metric5 = "metric5".to_string();
1169        let metric6 = "metric6".to_string();
1170        let names2 = vec![&metric5, &metric6];
1171        let result = fast_legacy_check(&cache, &names2);
1172        assert!(result.is_ok());
1173        assert_eq!(result.unwrap(), None); // should return None as no cache hits
1174
1175        // Test case 3: Incompatible modes should return error
1176        let cache_incompatible = DashMap::new();
1177        cache_incompatible.insert("metric1".to_string(), true); // legacy
1178        cache_incompatible.insert("metric2".to_string(), false); // prom
1179        let metric1_test = "metric1".to_string();
1180        let metric2_test = "metric2".to_string();
1181        let names3 = vec![&metric1_test, &metric2_test];
1182        let result = fast_legacy_check(&cache_incompatible, &names3);
1183        assert!(result.is_err()); // should error due to incompatible modes
1184
1185        // Test case 4: Intensive concurrent access to test deadlock prevention
1186        // This test specifically targets the scenario where multiple threads
1187        // access the same cache entries simultaneously
1188        let cache_concurrent = Arc::new(DashMap::new());
1189        cache_concurrent.insert("shared_metric".to_string(), true);
1190
1191        let num_threads = 8;
1192        let operations_per_thread = 100;
1193        let barrier = Arc::new(Barrier::new(num_threads));
1194        let success_flag = Arc::new(AtomicBool::new(true));
1195
1196        let handles: Vec<_> = (0..num_threads)
1197            .map(|thread_id| {
1198                let cache_clone = Arc::clone(&cache_concurrent);
1199                let barrier_clone = Arc::clone(&barrier);
1200                let success_flag_clone = Arc::clone(&success_flag);
1201
1202                thread::spawn(move || {
1203                    // Wait for all threads to be ready
1204                    barrier_clone.wait();
1205
1206                    let start_time = Instant::now();
1207                    for i in 0..operations_per_thread {
1208                        // Each operation references existing cache entry and adds new ones
1209                        let shared_metric = "shared_metric".to_string();
1210                        let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1211                        let names = vec![&shared_metric, &new_metric];
1212
1213                        match fast_legacy_check(&cache_clone, &names) {
1214                            Ok(_) => {}
1215                            Err(_) => {
1216                                success_flag_clone.store(false, Ordering::Relaxed);
1217                                return;
1218                            }
1219                        }
1220
1221                        // If the test takes too long, it likely means deadlock
1222                        if start_time.elapsed() > Duration::from_secs(10) {
1223                            success_flag_clone.store(false, Ordering::Relaxed);
1224                            return;
1225                        }
1226                    }
1227                })
1228            })
1229            .collect();
1230
1231        // Join all threads with timeout
1232        let start_time = Instant::now();
1233        for (i, handle) in handles.into_iter().enumerate() {
1234            let join_result = handle.join();
1235
1236            // Check if we're taking too long (potential deadlock)
1237            if start_time.elapsed() > Duration::from_secs(30) {
1238                panic!("Test timed out - possible deadlock detected!");
1239            }
1240
1241            if join_result.is_err() {
1242                panic!("Thread {} panicked during execution", i);
1243            }
1244        }
1245
1246        // Verify all operations completed successfully
1247        assert!(
1248            success_flag.load(Ordering::Relaxed),
1249            "Some operations failed"
1250        );
1251
1252        // Verify that many new entries were added (proving operations completed)
1253        let final_count = cache_concurrent.len();
1254        assert!(
1255            final_count > 1 + num_threads * operations_per_thread / 2,
1256            "Expected more cache entries, got {}",
1257            final_count
1258        );
1259    }
1260
1261    #[test]
1262    fn test_exec_validation() {
1263        let query_ctx = QueryContext::arc();
1264        let plugins: Plugins = Plugins::new();
1265        plugins.insert(QueryOptions {
1266            disallow_cross_catalog_query: true,
1267        });
1268
1269        let sql = r#"
1270        SELECT * FROM demo;
1271        EXPLAIN SELECT * FROM demo;
1272        CREATE DATABASE test_database;
1273        SHOW DATABASES;
1274        "#;
1275        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1276        assert_eq!(stmts.len(), 4);
1277        for stmt in stmts {
1278            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1279            re.unwrap();
1280        }
1281
1282        let sql = r#"
1283        SHOW CREATE TABLE demo;
1284        ALTER TABLE demo ADD COLUMN new_col INT;
1285        "#;
1286        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1287        assert_eq!(stmts.len(), 2);
1288        for stmt in stmts {
1289            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1290            re.unwrap();
1291        }
1292
1293        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1294            // test right
1295            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1296            for (catalog, schema) in right {
1297                let sql = do_fmt(template_sql, catalog, schema);
1298                do_test(&sql, plugins.clone(), query_ctx, true);
1299            }
1300
1301            let wrong = vec![
1302                ("wrongcatalog.", "public."),
1303                ("wrongcatalog.", "wrongschema."),
1304            ];
1305            for (catalog, schema) in wrong {
1306                let sql = do_fmt(template_sql, catalog, schema);
1307                do_test(&sql, plugins.clone(), query_ctx, false);
1308            }
1309        }
1310
1311        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1312            let vars = HashMap::from([
1313                ("catalog".to_string(), catalog),
1314                ("schema".to_string(), schema),
1315            ]);
1316            template.format(&vars).unwrap()
1317        }
1318
1319        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1320            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1321            let re = check_permission(plugins, stmt, query_ctx);
1322            if is_ok {
1323                re.unwrap();
1324            } else {
1325                assert!(re.is_err());
1326            }
1327        }
1328
1329        // test insert
1330        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1331        replace_test(sql, plugins.clone(), &query_ctx);
1332
1333        // test create table
1334        let sql = r#"CREATE TABLE {catalog}{schema}demo(
1335                            host STRING,
1336                            ts TIMESTAMP,
1337                            TIME INDEX (ts),
1338                            PRIMARY KEY(host)
1339                        ) engine=mito;"#;
1340        replace_test(sql, plugins.clone(), &query_ctx);
1341
1342        // test drop table
1343        let sql = "DROP TABLE {catalog}{schema}demo;";
1344        replace_test(sql, plugins.clone(), &query_ctx);
1345
1346        // test show tables
1347        let sql = "SHOW TABLES FROM public";
1348        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1349        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1350
1351        let sql = "SHOW TABLES FROM private";
1352        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1353        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1354        assert!(re.is_ok());
1355
1356        // test describe table
1357        let sql = "DESC TABLE {catalog}{schema}demo;";
1358        replace_test(sql, plugins.clone(), &query_ctx);
1359
1360        let comment_flow_cases = [
1361            ("COMMENT ON FLOW my_flow IS 'comment';", true),
1362            ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
1363            ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
1364        ];
1365        for (sql, is_ok) in comment_flow_cases {
1366            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1367            let result = check_permission(plugins.clone(), stmt, &query_ctx);
1368            assert_eq!(result.is_ok(), is_ok);
1369        }
1370
1371        let show_flow_cases = [
1372            ("SHOW CREATE FLOW my_flow;", true),
1373            ("SHOW CREATE FLOW greptime.my_flow;", true),
1374            ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
1375        ];
1376        for (sql, is_ok) in show_flow_cases {
1377            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1378            let result = check_permission(plugins.clone(), stmt, &query_ctx);
1379            assert_eq!(result.is_ok(), is_ok);
1380        }
1381    }
1382}