frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::pin::Pin;
29use std::sync::Arc;
30use std::time::{Duration, SystemTime};
31
32use async_stream::stream;
33use async_trait::async_trait;
34use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
35use catalog::CatalogManagerRef;
36use catalog::process_manager::{
37    ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
38};
39use client::OutputData;
40use common_base::Plugins;
41use common_base::cancellation::CancellableFuture;
42use common_error::ext::{BoxedError, ErrorExt};
43use common_event_recorder::EventRecorderRef;
44use common_meta::cache_invalidator::CacheInvalidatorRef;
45use common_meta::key::TableMetadataManagerRef;
46use common_meta::key::table_name::TableNameKey;
47use common_meta::node_manager::NodeManagerRef;
48use common_meta::procedure_executor::ProcedureExecutorRef;
49use common_query::Output;
50use common_recordbatch::RecordBatchStreamWrapper;
51use common_recordbatch::error::StreamTimeoutSnafu;
52use common_telemetry::logging::SlowQueryOptions;
53use common_telemetry::{debug, error, tracing};
54use dashmap::DashMap;
55use datafusion_expr::LogicalPlan;
56use futures::{Stream, StreamExt};
57use lazy_static::lazy_static;
58use operator::delete::DeleterRef;
59use operator::insert::InserterRef;
60use operator::statement::{StatementExecutor, StatementExecutorRef};
61use partition::manager::PartitionRuleManagerRef;
62use pipeline::pipeline_operator::PipelineOperator;
63use prometheus::HistogramTimer;
64use promql_parser::label::Matcher;
65use query::QueryEngineRef;
66use query::metrics::OnDone;
67use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
68use query::query_engine::DescribeResult;
69use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
70use servers::error::{
71    self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
72    OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
73};
74use servers::interceptor::{
75    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
76};
77use servers::otlp::metrics::legacy_normalize_otlp_name;
78use servers::prometheus_handler::PrometheusHandler;
79use servers::query_handler::sql::SqlQueryHandler;
80use session::context::{Channel, QueryContextRef};
81use session::table_name::table_idents_to_full_name;
82use snafu::prelude::*;
83use sql::ast::ObjectNamePartExt;
84use sql::dialect::Dialect;
85use sql::parser::{ParseOptions, ParserContext};
86use sql::statements::copy::{CopyDatabase, CopyTable};
87use sql::statements::statement::Statement;
88use sql::statements::tql::Tql;
89use sqlparser::ast::ObjectName;
90pub use standalone::StandaloneDatanodeManager;
91use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
92
93use crate::error::{
94    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
95    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
96    StatementTimeoutSnafu, TableOperationSnafu,
97};
98use crate::limiter::LimiterRef;
99use crate::stream_wrapper::CancellableStreamWrapper;
100
101lazy_static! {
102    static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
103}
104
105/// The frontend instance contains necessary components, and implements many
106/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
107/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
108#[derive(Clone)]
109pub struct Instance {
110    catalog_manager: CatalogManagerRef,
111    pipeline_operator: Arc<PipelineOperator>,
112    statement_executor: Arc<StatementExecutor>,
113    query_engine: QueryEngineRef,
114    plugins: Plugins,
115    inserter: InserterRef,
116    deleter: DeleterRef,
117    table_metadata_manager: TableMetadataManagerRef,
118    event_recorder: Option<EventRecorderRef>,
119    limiter: Option<LimiterRef>,
120    process_manager: ProcessManagerRef,
121    slow_query_options: SlowQueryOptions,
122
123    // cache for otlp metrics
124    // first layer key: db-string
125    // key: direct input metric name
126    // value: if runs in legacy mode
127    otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
128}
129
130impl Instance {
131    pub fn catalog_manager(&self) -> &CatalogManagerRef {
132        &self.catalog_manager
133    }
134
135    pub fn query_engine(&self) -> &QueryEngineRef {
136        &self.query_engine
137    }
138
139    pub fn plugins(&self) -> &Plugins {
140        &self.plugins
141    }
142
143    pub fn statement_executor(&self) -> &StatementExecutorRef {
144        &self.statement_executor
145    }
146
147    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
148        &self.table_metadata_manager
149    }
150
151    pub fn inserter(&self) -> &InserterRef {
152        &self.inserter
153    }
154
155    pub fn process_manager(&self) -> &ProcessManagerRef {
156        &self.process_manager
157    }
158
159    pub fn node_manager(&self) -> &NodeManagerRef {
160        self.inserter.node_manager()
161    }
162
163    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
164        self.inserter.partition_manager()
165    }
166
167    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
168        self.statement_executor.cache_invalidator()
169    }
170
171    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
172        self.statement_executor.procedure_executor()
173    }
174}
175
176fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
177    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
178}
179
180impl Instance {
181    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
182        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
183
184        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
185        let query_interceptor = query_interceptor.as_ref();
186
187        if should_capture_statement(Some(&stmt)) {
188            let slow_query_timer = self
189                .slow_query_options
190                .enable
191                .then(|| self.event_recorder.clone())
192                .flatten()
193                .map(|event_recorder| {
194                    SlowQueryTimer::new(
195                        CatalogQueryStatement::Sql(stmt.clone()),
196                        self.slow_query_options.threshold,
197                        self.slow_query_options.sample_ratio,
198                        self.slow_query_options.record_type,
199                        event_recorder,
200                    )
201                });
202
203            let ticket = self.process_manager.register_query(
204                query_ctx.current_catalog().to_string(),
205                vec![query_ctx.current_schema()],
206                stmt.to_string(),
207                query_ctx.conn_info().to_string(),
208                Some(query_ctx.process_id()),
209                slow_query_timer,
210            );
211
212            let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
213
214            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
215                .await
216                .map_err(|_| error::CancelledSnafu.build())?
217                .map(|output| {
218                    let Output { meta, data } = output;
219
220                    let data = match data {
221                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
222                            CancellableStreamWrapper::new(stream, ticket),
223                        )),
224                        other => other,
225                    };
226                    Output { data, meta }
227                })
228        } else {
229            self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
230                .await
231        }
232    }
233
234    async fn exec_statement_with_timeout(
235        &self,
236        stmt: Statement,
237        query_ctx: QueryContextRef,
238        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
239    ) -> Result<Output> {
240        let timeout = derive_timeout(&stmt, &query_ctx);
241        match timeout {
242            Some(timeout) => {
243                let start = tokio::time::Instant::now();
244                let output = tokio::time::timeout(
245                    timeout,
246                    self.exec_statement(stmt, query_ctx, query_interceptor),
247                )
248                .await
249                .map_err(|_| StatementTimeoutSnafu.build())??;
250                // compute remaining timeout
251                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
252                attach_timeout(output, remaining_timeout)
253            }
254            None => {
255                self.exec_statement(stmt, query_ctx, query_interceptor)
256                    .await
257            }
258        }
259    }
260
261    async fn exec_statement(
262        &self,
263        stmt: Statement,
264        query_ctx: QueryContextRef,
265        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
266    ) -> Result<Output> {
267        match stmt {
268            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
269                // TODO: remove this when format is supported in datafusion
270                if let Statement::Explain(explain) = &stmt
271                    && let Some(format) = explain.format()
272                {
273                    query_ctx.set_explain_format(format.to_string());
274                }
275
276                self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
277                    .await
278            }
279            Statement::Tql(tql) => {
280                self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
281                    .await
282            }
283            _ => {
284                query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
285                self.statement_executor
286                    .execute_sql(stmt, query_ctx)
287                    .await
288                    .context(TableOperationSnafu)
289            }
290        }
291    }
292
293    async fn plan_and_exec_sql(
294        &self,
295        stmt: Statement,
296        query_ctx: &QueryContextRef,
297        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
298    ) -> Result<Output> {
299        let stmt = QueryStatement::Sql(stmt);
300        let plan = self
301            .statement_executor
302            .plan(&stmt, query_ctx.clone())
303            .await?;
304        let QueryStatement::Sql(stmt) = stmt else {
305            unreachable!()
306        };
307        query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
308        self.statement_executor
309            .exec_plan(plan, query_ctx.clone())
310            .await
311            .context(TableOperationSnafu)
312    }
313
314    async fn plan_and_exec_tql(
315        &self,
316        query_ctx: &QueryContextRef,
317        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
318        tql: Tql,
319    ) -> Result<Output> {
320        let plan = self
321            .statement_executor
322            .plan_tql(tql.clone(), query_ctx)
323            .await?;
324        query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
325        self.statement_executor
326            .exec_plan(plan, query_ctx.clone())
327            .await
328            .context(TableOperationSnafu)
329    }
330
331    async fn check_otlp_legacy(
332        &self,
333        names: &[&String],
334        ctx: QueryContextRef,
335    ) -> server_error::Result<bool> {
336        let db_string = ctx.get_db_string();
337        // fast cache check
338        let cache = self
339            .otlp_metrics_table_legacy_cache
340            .entry(db_string.clone())
341            .or_default();
342        if let Some(flag) = fast_legacy_check(&cache, names)? {
343            return Ok(flag);
344        }
345        // release cache reference to avoid lock contention
346        drop(cache);
347
348        let catalog = ctx.current_catalog();
349        let schema = ctx.current_schema();
350
351        // query legacy table names
352        let normalized_names = names
353            .iter()
354            .map(|n| legacy_normalize_otlp_name(n))
355            .collect::<Vec<_>>();
356        let table_names = normalized_names
357            .iter()
358            .map(|n| TableNameKey::new(catalog, &schema, n))
359            .collect::<Vec<_>>();
360        let table_values = self
361            .table_metadata_manager()
362            .table_name_manager()
363            .batch_get(table_names)
364            .await
365            .context(CommonMetaSnafu)?;
366        let table_ids = table_values
367            .into_iter()
368            .filter_map(|v| v.map(|vi| vi.table_id()))
369            .collect::<Vec<_>>();
370
371        // means no existing table is found, use new mode
372        if table_ids.is_empty() {
373            let cache = self
374                .otlp_metrics_table_legacy_cache
375                .entry(db_string)
376                .or_default();
377            names.iter().for_each(|name| {
378                cache.insert((*name).clone(), false);
379            });
380            return Ok(false);
381        }
382
383        // has existing table, check table options
384        let table_infos = self
385            .table_metadata_manager()
386            .table_info_manager()
387            .batch_get(&table_ids)
388            .await
389            .context(CommonMetaSnafu)?;
390        let options = table_infos
391            .values()
392            .map(|info| {
393                info.table_info
394                    .meta
395                    .options
396                    .extra_options
397                    .get(OTLP_METRIC_COMPAT_KEY)
398                    .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
399            })
400            .collect::<Vec<_>>();
401        let cache = self
402            .otlp_metrics_table_legacy_cache
403            .entry(db_string)
404            .or_default();
405        if !options.is_empty() {
406            // check value consistency
407            let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
408            let has_legacy = options
409                .iter()
410                .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
411            ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
412            let flag = has_legacy;
413            names.iter().for_each(|name| {
414                cache.insert((*name).clone(), flag);
415            });
416            Ok(flag)
417        } else {
418            // no table info, use new mode
419            names.iter().for_each(|name| {
420                cache.insert((*name).clone(), false);
421            });
422            Ok(false)
423        }
424    }
425}
426
427fn fast_legacy_check(
428    cache: &DashMap<String, bool>,
429    names: &[&String],
430) -> server_error::Result<Option<bool>> {
431    let hit_cache = names
432        .iter()
433        .filter_map(|name| cache.get(*name))
434        .collect::<Vec<_>>();
435    if !hit_cache.is_empty() {
436        let hit_legacy = hit_cache.iter().any(|en| *en.value());
437        let hit_prom = hit_cache.iter().any(|en| !*en.value());
438
439        // hit but have true and false, means both legacy and new mode are used
440        // we cannot handle this case, so return error
441        // add doc links in err msg later
442        ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
443
444        let flag = hit_legacy;
445        // drop hit_cache to release references before inserting to avoid deadlock
446        drop(hit_cache);
447
448        // set cache for all names
449        names.iter().for_each(|name| {
450            if !cache.contains_key(*name) {
451                cache.insert((*name).clone(), flag);
452            }
453        });
454        Ok(Some(flag))
455    } else {
456        Ok(None)
457    }
458}
459
460/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
461/// For MySQL, it applies only to read-only statements.
462fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
463    let query_timeout = query_ctx.query_timeout()?;
464    if query_timeout.is_zero() {
465        return None;
466    }
467    match query_ctx.channel() {
468        Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
469        Channel::Postgres => Some(query_timeout),
470        _ => None,
471    }
472}
473
474fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
475    if timeout.is_zero() {
476        return StatementTimeoutSnafu.fail();
477    }
478
479    let output = match output.data {
480        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
481        OutputData::Stream(mut stream) => {
482            let schema = stream.schema();
483            let s = Box::pin(stream! {
484                let mut start = tokio::time::Instant::now();
485                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
486                    yield item;
487
488                    let now = tokio::time::Instant::now();
489                    timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
490                    start = now;
491                    // tokio::time::timeout may not return an error immediately when timeout is 0.
492                    if timeout.is_zero() {
493                        StreamTimeoutSnafu.fail()?;
494                    }
495                }
496            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
497            let stream = RecordBatchStreamWrapper {
498                schema,
499                stream: s,
500                output_ordering: None,
501                metrics: Default::default(),
502            };
503            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
504        }
505    };
506
507    Ok(output)
508}
509
510#[async_trait]
511impl SqlQueryHandler for Instance {
512    type Error = Error;
513
514    #[tracing::instrument(skip_all)]
515    async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
516        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
517        let query_interceptor = query_interceptor_opt.as_ref();
518        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
519            Ok(q) => q,
520            Err(e) => return vec![Err(e)],
521        };
522
523        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
524        let checker = checker_ref.as_ref();
525
526        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
527            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
528        {
529            Ok(stmts) => {
530                if stmts.is_empty() {
531                    return vec![
532                        InvalidSqlSnafu {
533                            err_msg: "empty statements",
534                        }
535                        .fail(),
536                    ];
537                }
538
539                let mut results = Vec::with_capacity(stmts.len());
540                for stmt in stmts {
541                    if let Err(e) = checker
542                        .check_permission(
543                            query_ctx.current_user(),
544                            PermissionReq::SqlStatement(&stmt),
545                        )
546                        .context(PermissionSnafu)
547                    {
548                        results.push(Err(e));
549                        break;
550                    }
551
552                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
553                        Ok(output) => {
554                            let output_result =
555                                query_interceptor.post_execute(output, query_ctx.clone());
556                            results.push(output_result);
557                        }
558                        Err(e) => {
559                            if e.status_code().should_log_error() {
560                                error!(e; "Failed to execute query: {stmt}");
561                            } else {
562                                debug!("Failed to execute query: {stmt}, {e}");
563                            }
564                            results.push(Err(e));
565                            break;
566                        }
567                    }
568                }
569                results
570            }
571            Err(e) => {
572                vec![Err(e)]
573            }
574        }
575    }
576
577    async fn do_exec_plan(
578        &self,
579        stmt: Option<Statement>,
580        plan: LogicalPlan,
581        query_ctx: QueryContextRef,
582    ) -> Result<Output> {
583        if should_capture_statement(stmt.as_ref()) {
584            // It's safe to unwrap here because we've already checked the type.
585            let stmt = stmt.unwrap();
586            let query = stmt.to_string();
587            let slow_query_timer = self
588                .slow_query_options
589                .enable
590                .then(|| self.event_recorder.clone())
591                .flatten()
592                .map(|event_recorder| {
593                    SlowQueryTimer::new(
594                        CatalogQueryStatement::Sql(stmt.clone()),
595                        self.slow_query_options.threshold,
596                        self.slow_query_options.sample_ratio,
597                        self.slow_query_options.record_type,
598                        event_recorder,
599                    )
600                });
601
602            let ticket = self.process_manager.register_query(
603                query_ctx.current_catalog().to_string(),
604                vec![query_ctx.current_schema()],
605                query,
606                query_ctx.conn_info().to_string(),
607                Some(query_ctx.process_id()),
608                slow_query_timer,
609            );
610
611            let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
612
613            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
614                .await
615                .map_err(|_| error::CancelledSnafu.build())?
616                .map(|output| {
617                    let Output { meta, data } = output;
618
619                    let data = match data {
620                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
621                            CancellableStreamWrapper::new(stream, ticket),
622                        )),
623                        other => other,
624                    };
625                    Output { data, meta }
626                })
627                .context(ExecLogicalPlanSnafu)
628        } else {
629            // plan should be prepared before exec
630            // we'll do check there
631            self.query_engine
632                .execute(plan.clone(), query_ctx)
633                .await
634                .context(ExecLogicalPlanSnafu)
635        }
636    }
637
638    #[tracing::instrument(skip_all)]
639    async fn do_promql_query(
640        &self,
641        query: &PromQuery,
642        query_ctx: QueryContextRef,
643    ) -> Vec<Result<Output>> {
644        // check will be done in prometheus handler's do_query
645        let result = PrometheusHandler::do_query(self, query, query_ctx)
646            .await
647            .with_context(|_| ExecutePromqlSnafu {
648                query: format!("{query:?}"),
649            });
650        vec![result]
651    }
652
653    async fn do_describe(
654        &self,
655        stmt: Statement,
656        query_ctx: QueryContextRef,
657    ) -> Result<Option<DescribeResult>> {
658        if matches!(
659            stmt,
660            Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
661        ) {
662            self.plugins
663                .get::<PermissionCheckerRef>()
664                .as_ref()
665                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
666                .context(PermissionSnafu)?;
667
668            let plan = self
669                .query_engine
670                .planner()
671                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
672                .await
673                .context(PlanStatementSnafu)?;
674            self.query_engine
675                .describe(plan, query_ctx)
676                .await
677                .map(Some)
678                .context(error::DescribeStatementSnafu)
679        } else {
680            Ok(None)
681        }
682    }
683
684    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
685        self.catalog_manager
686            .schema_exists(catalog, schema, None)
687            .await
688            .context(error::CatalogSnafu)
689    }
690}
691
692/// Attaches a timer to the output and observes it once the output is exhausted.
693pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
694    match output.data {
695        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
696        OutputData::Stream(stream) => {
697            let stream = OnDone::new(stream, move || {
698                timer.observe_duration();
699            });
700            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
701        }
702    }
703}
704
705#[async_trait]
706impl PrometheusHandler for Instance {
707    #[tracing::instrument(skip_all)]
708    async fn do_query(
709        &self,
710        query: &PromQuery,
711        query_ctx: QueryContextRef,
712    ) -> server_error::Result<Output> {
713        let interceptor = self
714            .plugins
715            .get::<PromQueryInterceptorRef<server_error::Error>>();
716
717        self.plugins
718            .get::<PermissionCheckerRef>()
719            .as_ref()
720            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
721            .context(AuthSnafu)?;
722
723        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
724            ParsePromQLSnafu {
725                query: query.clone(),
726            }
727        })?;
728
729        let plan = self
730            .statement_executor
731            .plan(&stmt, query_ctx.clone())
732            .await
733            .map_err(BoxedError::new)
734            .context(ExecuteQuerySnafu)?;
735
736        interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
737
738        // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
739        let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
740            CatalogQueryStatement::Promql(eval_stmt, alias)
741        } else {
742            // It should not happen since the query is already parsed successfully.
743            return UnexpectedResultSnafu {
744                reason: "The query should always be promql.".to_string(),
745            }
746            .fail();
747        };
748        let query = query_statement.to_string();
749
750        let slow_query_timer = self
751            .slow_query_options
752            .enable
753            .then(|| self.event_recorder.clone())
754            .flatten()
755            .map(|event_recorder| {
756                SlowQueryTimer::new(
757                    query_statement,
758                    self.slow_query_options.threshold,
759                    self.slow_query_options.sample_ratio,
760                    self.slow_query_options.record_type,
761                    event_recorder,
762                )
763            });
764
765        let ticket = self.process_manager.register_query(
766            query_ctx.current_catalog().to_string(),
767            vec![query_ctx.current_schema()],
768            query,
769            query_ctx.conn_info().to_string(),
770            Some(query_ctx.process_id()),
771            slow_query_timer,
772        );
773
774        let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
775
776        let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
777            .await
778            .map_err(|_| servers::error::CancelledSnafu.build())?
779            .map(|output| {
780                let Output { meta, data } = output;
781                let data = match data {
782                    OutputData::Stream(stream) => {
783                        OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
784                    }
785                    other => other,
786                };
787                Output { data, meta }
788            })
789            .map_err(BoxedError::new)
790            .context(ExecuteQuerySnafu)?;
791
792        Ok(interceptor.post_execute(output, query_ctx)?)
793    }
794
795    async fn query_metric_names(
796        &self,
797        matchers: Vec<Matcher>,
798        ctx: &QueryContextRef,
799    ) -> server_error::Result<Vec<String>> {
800        self.handle_query_metric_names(matchers, ctx)
801            .await
802            .map_err(BoxedError::new)
803            .context(ExecuteQuerySnafu)
804    }
805
806    async fn query_label_values(
807        &self,
808        metric: String,
809        label_name: String,
810        matchers: Vec<Matcher>,
811        start: SystemTime,
812        end: SystemTime,
813        ctx: &QueryContextRef,
814    ) -> server_error::Result<Vec<String>> {
815        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
816            .await
817            .map_err(BoxedError::new)
818            .context(ExecuteQuerySnafu)
819    }
820
821    fn catalog_manager(&self) -> CatalogManagerRef {
822        self.catalog_manager.clone()
823    }
824}
825
826/// Validate `stmt.database` permission if it's presented.
827macro_rules! validate_db_permission {
828    ($stmt: expr, $query_ctx: expr) => {
829        if let Some(database) = &$stmt.database {
830            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
831                .map_err(BoxedError::new)
832                .context(SqlExecInterceptedSnafu)?;
833        }
834    };
835}
836
837pub fn check_permission(
838    plugins: Plugins,
839    stmt: &Statement,
840    query_ctx: &QueryContextRef,
841) -> Result<()> {
842    let need_validate = plugins
843        .get::<QueryOptions>()
844        .map(|opts| opts.disallow_cross_catalog_query)
845        .unwrap_or_default();
846
847    if !need_validate {
848        return Ok(());
849    }
850
851    match stmt {
852        // Will be checked in execution.
853        // TODO(dennis): add a hook for admin commands.
854        Statement::Admin(_) => {}
855        // These are executed by query engine, and will be checked there.
856        Statement::Query(_)
857        | Statement::Explain(_)
858        | Statement::Tql(_)
859        | Statement::Delete(_)
860        | Statement::DeclareCursor(_)
861        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
862        // database ops won't be checked
863        Statement::CreateDatabase(_)
864        | Statement::ShowDatabases(_)
865        | Statement::DropDatabase(_)
866        | Statement::AlterDatabase(_)
867        | Statement::DropFlow(_)
868        | Statement::Use(_) => {}
869        #[cfg(feature = "enterprise")]
870        Statement::DropTrigger(_) => {}
871        Statement::ShowCreateDatabase(stmt) => {
872            validate_database(&stmt.database_name, query_ctx)?;
873        }
874        Statement::ShowCreateTable(stmt) => {
875            validate_param(&stmt.table_name, query_ctx)?;
876        }
877        Statement::ShowCreateFlow(stmt) => {
878            validate_param(&stmt.flow_name, query_ctx)?;
879        }
880        #[cfg(feature = "enterprise")]
881        Statement::ShowCreateTrigger(stmt) => {
882            validate_param(&stmt.trigger_name, query_ctx)?;
883        }
884        Statement::ShowCreateView(stmt) => {
885            validate_param(&stmt.view_name, query_ctx)?;
886        }
887        Statement::CreateExternalTable(stmt) => {
888            validate_param(&stmt.name, query_ctx)?;
889        }
890        Statement::CreateFlow(stmt) => {
891            // TODO: should also validate source table name here?
892            validate_param(&stmt.sink_table_name, query_ctx)?;
893        }
894        #[cfg(feature = "enterprise")]
895        Statement::CreateTrigger(stmt) => {
896            validate_param(&stmt.trigger_name, query_ctx)?;
897        }
898        Statement::CreateView(stmt) => {
899            validate_param(&stmt.name, query_ctx)?;
900        }
901        Statement::AlterTable(stmt) => {
902            validate_param(stmt.table_name(), query_ctx)?;
903        }
904        #[cfg(feature = "enterprise")]
905        Statement::AlterTrigger(_) => {}
906        // set/show variable now only alter/show variable in session
907        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
908        // show charset and show collation won't be checked
909        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
910
911        Statement::Insert(insert) => {
912            let name = insert.table_name().context(ParseSqlSnafu)?;
913            validate_param(name, query_ctx)?;
914        }
915        Statement::CreateTable(stmt) => {
916            validate_param(&stmt.name, query_ctx)?;
917        }
918        Statement::CreateTableLike(stmt) => {
919            validate_param(&stmt.table_name, query_ctx)?;
920            validate_param(&stmt.source_name, query_ctx)?;
921        }
922        Statement::DropTable(drop_stmt) => {
923            for table_name in drop_stmt.table_names() {
924                validate_param(table_name, query_ctx)?;
925            }
926        }
927        Statement::DropView(stmt) => {
928            validate_param(&stmt.view_name, query_ctx)?;
929        }
930        Statement::ShowTables(stmt) => {
931            validate_db_permission!(stmt, query_ctx);
932        }
933        Statement::ShowTableStatus(stmt) => {
934            validate_db_permission!(stmt, query_ctx);
935        }
936        Statement::ShowColumns(stmt) => {
937            validate_db_permission!(stmt, query_ctx);
938        }
939        Statement::ShowIndex(stmt) => {
940            validate_db_permission!(stmt, query_ctx);
941        }
942        Statement::ShowRegion(stmt) => {
943            validate_db_permission!(stmt, query_ctx);
944        }
945        Statement::ShowViews(stmt) => {
946            validate_db_permission!(stmt, query_ctx);
947        }
948        Statement::ShowFlows(stmt) => {
949            validate_db_permission!(stmt, query_ctx);
950        }
951        #[cfg(feature = "enterprise")]
952        Statement::ShowTriggers(_stmt) => {
953            // The trigger is organized based on the catalog dimension, so there
954            // is no need to check the permission of the database(schema).
955        }
956        Statement::ShowStatus(_stmt) => {}
957        Statement::ShowSearchPath(_stmt) => {}
958        Statement::DescribeTable(stmt) => {
959            validate_param(stmt.name(), query_ctx)?;
960        }
961        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
962            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
963            CopyTable::From(copy_table_from) => {
964                validate_param(&copy_table_from.table_name, query_ctx)?
965            }
966        },
967        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
968            match copy_database {
969                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
970                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
971            }
972        }
973        Statement::TruncateTable(stmt) => {
974            validate_param(stmt.table_name(), query_ctx)?;
975        }
976        // cursor operations are always allowed once it's created
977        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
978        // User can only kill process in their own catalog.
979        Statement::Kill(_) => {}
980        // SHOW PROCESSLIST
981        Statement::ShowProcesslist(_) => {}
982    }
983    Ok(())
984}
985
986fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
987    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
988        .map_err(BoxedError::new)
989        .context(ExternalSnafu)?;
990
991    validate_catalog_and_schema(&catalog, &schema, query_ctx)
992        .map_err(BoxedError::new)
993        .context(SqlExecInterceptedSnafu)
994}
995
996fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
997    let (catalog, schema) = match &name.0[..] {
998        [schema] => (
999            query_ctx.current_catalog().to_string(),
1000            schema.to_string_unquoted(),
1001        ),
1002        [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1003        _ => InvalidSqlSnafu {
1004            err_msg: format!(
1005                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1006            ),
1007        }
1008        .fail()?,
1009    };
1010
1011    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1012        .map_err(BoxedError::new)
1013        .context(SqlExecInterceptedSnafu)
1014}
1015
1016// Create a query ticket and slow query timer if the statement is a query or readonly statement.
1017fn should_capture_statement(stmt: Option<&Statement>) -> bool {
1018    if let Some(stmt) = stmt {
1019        matches!(stmt, Statement::Query(_)) || stmt.is_readonly()
1020    } else {
1021        false
1022    }
1023}
1024
1025#[cfg(test)]
1026mod tests {
1027    use std::collections::HashMap;
1028    use std::sync::atomic::{AtomicBool, Ordering};
1029    use std::sync::{Arc, Barrier};
1030    use std::thread;
1031    use std::time::{Duration, Instant};
1032
1033    use common_base::Plugins;
1034    use query::query_engine::options::QueryOptions;
1035    use session::context::QueryContext;
1036    use sql::dialect::GreptimeDbDialect;
1037    use strfmt::Format;
1038
1039    use super::*;
1040
1041    #[test]
1042    fn test_fast_legacy_check_deadlock_prevention() {
1043        // Create a DashMap to simulate the cache
1044        let cache = DashMap::new();
1045
1046        // Pre-populate cache with some entries
1047        cache.insert("metric1".to_string(), true); // legacy mode
1048        cache.insert("metric2".to_string(), false); // prom mode
1049        cache.insert("metric3".to_string(), true); // legacy mode
1050
1051        // Test case 1: Normal operation with cache hits
1052        let metric1 = "metric1".to_string();
1053        let metric4 = "metric4".to_string();
1054        let names1 = vec![&metric1, &metric4];
1055        let result = fast_legacy_check(&cache, &names1);
1056        assert!(result.is_ok());
1057        assert_eq!(result.unwrap(), Some(true)); // should return legacy mode
1058
1059        // Verify that metric4 was added to cache
1060        assert!(cache.contains_key("metric4"));
1061        assert!(*cache.get("metric4").unwrap().value());
1062
1063        // Test case 2: No cache hits
1064        let metric5 = "metric5".to_string();
1065        let metric6 = "metric6".to_string();
1066        let names2 = vec![&metric5, &metric6];
1067        let result = fast_legacy_check(&cache, &names2);
1068        assert!(result.is_ok());
1069        assert_eq!(result.unwrap(), None); // should return None as no cache hits
1070
1071        // Test case 3: Incompatible modes should return error
1072        let cache_incompatible = DashMap::new();
1073        cache_incompatible.insert("metric1".to_string(), true); // legacy
1074        cache_incompatible.insert("metric2".to_string(), false); // prom
1075        let metric1_test = "metric1".to_string();
1076        let metric2_test = "metric2".to_string();
1077        let names3 = vec![&metric1_test, &metric2_test];
1078        let result = fast_legacy_check(&cache_incompatible, &names3);
1079        assert!(result.is_err()); // should error due to incompatible modes
1080
1081        // Test case 4: Intensive concurrent access to test deadlock prevention
1082        // This test specifically targets the scenario where multiple threads
1083        // access the same cache entries simultaneously
1084        let cache_concurrent = Arc::new(DashMap::new());
1085        cache_concurrent.insert("shared_metric".to_string(), true);
1086
1087        let num_threads = 8;
1088        let operations_per_thread = 100;
1089        let barrier = Arc::new(Barrier::new(num_threads));
1090        let success_flag = Arc::new(AtomicBool::new(true));
1091
1092        let handles: Vec<_> = (0..num_threads)
1093            .map(|thread_id| {
1094                let cache_clone = Arc::clone(&cache_concurrent);
1095                let barrier_clone = Arc::clone(&barrier);
1096                let success_flag_clone = Arc::clone(&success_flag);
1097
1098                thread::spawn(move || {
1099                    // Wait for all threads to be ready
1100                    barrier_clone.wait();
1101
1102                    let start_time = Instant::now();
1103                    for i in 0..operations_per_thread {
1104                        // Each operation references existing cache entry and adds new ones
1105                        let shared_metric = "shared_metric".to_string();
1106                        let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1107                        let names = vec![&shared_metric, &new_metric];
1108
1109                        match fast_legacy_check(&cache_clone, &names) {
1110                            Ok(_) => {}
1111                            Err(_) => {
1112                                success_flag_clone.store(false, Ordering::Relaxed);
1113                                return;
1114                            }
1115                        }
1116
1117                        // If the test takes too long, it likely means deadlock
1118                        if start_time.elapsed() > Duration::from_secs(10) {
1119                            success_flag_clone.store(false, Ordering::Relaxed);
1120                            return;
1121                        }
1122                    }
1123                })
1124            })
1125            .collect();
1126
1127        // Join all threads with timeout
1128        let start_time = Instant::now();
1129        for (i, handle) in handles.into_iter().enumerate() {
1130            let join_result = handle.join();
1131
1132            // Check if we're taking too long (potential deadlock)
1133            if start_time.elapsed() > Duration::from_secs(30) {
1134                panic!("Test timed out - possible deadlock detected!");
1135            }
1136
1137            if join_result.is_err() {
1138                panic!("Thread {} panicked during execution", i);
1139            }
1140        }
1141
1142        // Verify all operations completed successfully
1143        assert!(
1144            success_flag.load(Ordering::Relaxed),
1145            "Some operations failed"
1146        );
1147
1148        // Verify that many new entries were added (proving operations completed)
1149        let final_count = cache_concurrent.len();
1150        assert!(
1151            final_count > 1 + num_threads * operations_per_thread / 2,
1152            "Expected more cache entries, got {}",
1153            final_count
1154        );
1155    }
1156
1157    #[test]
1158    fn test_exec_validation() {
1159        let query_ctx = QueryContext::arc();
1160        let plugins: Plugins = Plugins::new();
1161        plugins.insert(QueryOptions {
1162            disallow_cross_catalog_query: true,
1163        });
1164
1165        let sql = r#"
1166        SELECT * FROM demo;
1167        EXPLAIN SELECT * FROM demo;
1168        CREATE DATABASE test_database;
1169        SHOW DATABASES;
1170        "#;
1171        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1172        assert_eq!(stmts.len(), 4);
1173        for stmt in stmts {
1174            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1175            re.unwrap();
1176        }
1177
1178        let sql = r#"
1179        SHOW CREATE TABLE demo;
1180        ALTER TABLE demo ADD COLUMN new_col INT;
1181        "#;
1182        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1183        assert_eq!(stmts.len(), 2);
1184        for stmt in stmts {
1185            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1186            re.unwrap();
1187        }
1188
1189        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1190            // test right
1191            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1192            for (catalog, schema) in right {
1193                let sql = do_fmt(template_sql, catalog, schema);
1194                do_test(&sql, plugins.clone(), query_ctx, true);
1195            }
1196
1197            let wrong = vec![
1198                ("wrongcatalog.", "public."),
1199                ("wrongcatalog.", "wrongschema."),
1200            ];
1201            for (catalog, schema) in wrong {
1202                let sql = do_fmt(template_sql, catalog, schema);
1203                do_test(&sql, plugins.clone(), query_ctx, false);
1204            }
1205        }
1206
1207        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1208            let vars = HashMap::from([
1209                ("catalog".to_string(), catalog),
1210                ("schema".to_string(), schema),
1211            ]);
1212            template.format(&vars).unwrap()
1213        }
1214
1215        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1216            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1217            let re = check_permission(plugins, stmt, query_ctx);
1218            if is_ok {
1219                re.unwrap();
1220            } else {
1221                assert!(re.is_err());
1222            }
1223        }
1224
1225        // test insert
1226        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1227        replace_test(sql, plugins.clone(), &query_ctx);
1228
1229        // test create table
1230        let sql = r#"CREATE TABLE {catalog}{schema}demo(
1231                            host STRING,
1232                            ts TIMESTAMP,
1233                            TIME INDEX (ts),
1234                            PRIMARY KEY(host)
1235                        ) engine=mito;"#;
1236        replace_test(sql, plugins.clone(), &query_ctx);
1237
1238        // test drop table
1239        let sql = "DROP TABLE {catalog}{schema}demo;";
1240        replace_test(sql, plugins.clone(), &query_ctx);
1241
1242        // test show tables
1243        let sql = "SHOW TABLES FROM public";
1244        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1245        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1246
1247        let sql = "SHOW TABLES FROM private";
1248        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1249        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1250        assert!(re.is_ok());
1251
1252        // test describe table
1253        let sql = "DESC TABLE {catalog}{schema}demo;";
1254        replace_test(sql, plugins, &query_ctx);
1255    }
1256}