frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::pin::Pin;
29use std::sync::Arc;
30use std::time::{Duration, SystemTime};
31
32use async_stream::stream;
33use async_trait::async_trait;
34use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
35use catalog::CatalogManagerRef;
36use catalog::process_manager::{
37    ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
38};
39use client::OutputData;
40use common_base::Plugins;
41use common_base::cancellation::CancellableFuture;
42use common_config::KvBackendConfig;
43use common_error::ext::{BoxedError, ErrorExt};
44use common_event_recorder::EventRecorderRef;
45use common_meta::cache_invalidator::CacheInvalidatorRef;
46use common_meta::key::TableMetadataManagerRef;
47use common_meta::key::runtime_switch::RuntimeSwitchManager;
48use common_meta::key::table_name::TableNameKey;
49use common_meta::kv_backend::KvBackendRef;
50use common_meta::node_manager::NodeManagerRef;
51use common_meta::procedure_executor::ProcedureExecutorRef;
52use common_meta::state_store::KvStateStore;
53use common_procedure::ProcedureManagerRef;
54use common_procedure::local::{LocalManager, ManagerConfig};
55use common_procedure::options::ProcedureConfig;
56use common_query::Output;
57use common_recordbatch::RecordBatchStreamWrapper;
58use common_recordbatch::error::StreamTimeoutSnafu;
59use common_telemetry::logging::SlowQueryOptions;
60use common_telemetry::{debug, error, info, tracing};
61use dashmap::DashMap;
62use datafusion_expr::LogicalPlan;
63use futures::{Stream, StreamExt};
64use lazy_static::lazy_static;
65use log_store::raft_engine::RaftEngineBackend;
66use operator::delete::DeleterRef;
67use operator::insert::InserterRef;
68use operator::statement::{StatementExecutor, StatementExecutorRef};
69use partition::manager::PartitionRuleManagerRef;
70use pipeline::pipeline_operator::PipelineOperator;
71use prometheus::HistogramTimer;
72use promql_parser::label::Matcher;
73use query::QueryEngineRef;
74use query::metrics::OnDone;
75use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
76use query::query_engine::DescribeResult;
77use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
78use servers::error::{
79    self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
80    OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
81};
82use servers::interceptor::{
83    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
84};
85use servers::otlp::metrics::legacy_normalize_otlp_name;
86use servers::prometheus_handler::PrometheusHandler;
87use servers::query_handler::sql::SqlQueryHandler;
88use session::context::{Channel, QueryContextRef};
89use session::table_name::table_idents_to_full_name;
90use snafu::prelude::*;
91use sql::ast::ObjectNamePartExt;
92use sql::dialect::Dialect;
93use sql::parser::{ParseOptions, ParserContext};
94use sql::statements::copy::{CopyDatabase, CopyTable};
95use sql::statements::statement::Statement;
96use sql::statements::tql::Tql;
97use sqlparser::ast::ObjectName;
98pub use standalone::StandaloneDatanodeManager;
99use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
100
101use crate::error::{
102    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
103    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
104    StatementTimeoutSnafu, TableOperationSnafu,
105};
106use crate::limiter::LimiterRef;
107use crate::stream_wrapper::CancellableStreamWrapper;
108
109lazy_static! {
110    static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
111}
112
113/// The frontend instance contains necessary components, and implements many
114/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
115/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
116#[derive(Clone)]
117pub struct Instance {
118    catalog_manager: CatalogManagerRef,
119    pipeline_operator: Arc<PipelineOperator>,
120    statement_executor: Arc<StatementExecutor>,
121    query_engine: QueryEngineRef,
122    plugins: Plugins,
123    inserter: InserterRef,
124    deleter: DeleterRef,
125    table_metadata_manager: TableMetadataManagerRef,
126    event_recorder: Option<EventRecorderRef>,
127    limiter: Option<LimiterRef>,
128    process_manager: ProcessManagerRef,
129    slow_query_options: SlowQueryOptions,
130
131    // cache for otlp metrics
132    // first layer key: db-string
133    // key: direct input metric name
134    // value: if runs in legacy mode
135    otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
136}
137
138impl Instance {
139    pub async fn try_build_standalone_components(
140        dir: String,
141        kv_backend_config: KvBackendConfig,
142        procedure_config: ProcedureConfig,
143    ) -> Result<(KvBackendRef, ProcedureManagerRef)> {
144        info!(
145            "Creating metadata kvbackend with config: {:?}",
146            kv_backend_config
147        );
148        let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
149            .map_err(BoxedError::new)
150            .context(error::OpenRaftEngineBackendSnafu)?;
151
152        let kv_backend = Arc::new(kv_backend);
153        let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
154
155        let manager_config = ManagerConfig {
156            max_retry_times: procedure_config.max_retry_times,
157            retry_delay: procedure_config.retry_delay,
158            max_running_procedures: procedure_config.max_running_procedures,
159            ..Default::default()
160        };
161        let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
162        let procedure_manager = Arc::new(LocalManager::new(
163            manager_config,
164            kv_state_store.clone(),
165            kv_state_store,
166            Some(runtime_switch_manager),
167            None,
168        ));
169
170        Ok((kv_backend, procedure_manager))
171    }
172
173    pub fn catalog_manager(&self) -> &CatalogManagerRef {
174        &self.catalog_manager
175    }
176
177    pub fn query_engine(&self) -> &QueryEngineRef {
178        &self.query_engine
179    }
180
181    pub fn plugins(&self) -> &Plugins {
182        &self.plugins
183    }
184
185    pub fn statement_executor(&self) -> &StatementExecutorRef {
186        &self.statement_executor
187    }
188
189    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
190        &self.table_metadata_manager
191    }
192
193    pub fn inserter(&self) -> &InserterRef {
194        &self.inserter
195    }
196
197    pub fn process_manager(&self) -> &ProcessManagerRef {
198        &self.process_manager
199    }
200
201    pub fn node_manager(&self) -> &NodeManagerRef {
202        self.inserter.node_manager()
203    }
204
205    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
206        self.inserter.partition_manager()
207    }
208
209    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
210        self.statement_executor.cache_invalidator()
211    }
212
213    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
214        self.statement_executor.procedure_executor()
215    }
216}
217
218fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
219    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
220}
221
222impl Instance {
223    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
224        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
225
226        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
227        let query_interceptor = query_interceptor.as_ref();
228
229        if should_capture_statement(Some(&stmt)) {
230            let slow_query_timer = self
231                .slow_query_options
232                .enable
233                .then(|| self.event_recorder.clone())
234                .flatten()
235                .map(|event_recorder| {
236                    SlowQueryTimer::new(
237                        CatalogQueryStatement::Sql(stmt.clone()),
238                        self.slow_query_options.threshold,
239                        self.slow_query_options.sample_ratio,
240                        self.slow_query_options.record_type,
241                        event_recorder,
242                    )
243                });
244
245            let ticket = self.process_manager.register_query(
246                query_ctx.current_catalog().to_string(),
247                vec![query_ctx.current_schema()],
248                stmt.to_string(),
249                query_ctx.conn_info().to_string(),
250                Some(query_ctx.process_id()),
251                slow_query_timer,
252            );
253
254            let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
255
256            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
257                .await
258                .map_err(|_| error::CancelledSnafu.build())?
259                .map(|output| {
260                    let Output { meta, data } = output;
261
262                    let data = match data {
263                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
264                            CancellableStreamWrapper::new(stream, ticket),
265                        )),
266                        other => other,
267                    };
268                    Output { data, meta }
269                })
270        } else {
271            self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
272                .await
273        }
274    }
275
276    async fn exec_statement_with_timeout(
277        &self,
278        stmt: Statement,
279        query_ctx: QueryContextRef,
280        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
281    ) -> Result<Output> {
282        let timeout = derive_timeout(&stmt, &query_ctx);
283        match timeout {
284            Some(timeout) => {
285                let start = tokio::time::Instant::now();
286                let output = tokio::time::timeout(
287                    timeout,
288                    self.exec_statement(stmt, query_ctx, query_interceptor),
289                )
290                .await
291                .map_err(|_| StatementTimeoutSnafu.build())??;
292                // compute remaining timeout
293                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
294                attach_timeout(output, remaining_timeout)
295            }
296            None => {
297                self.exec_statement(stmt, query_ctx, query_interceptor)
298                    .await
299            }
300        }
301    }
302
303    async fn exec_statement(
304        &self,
305        stmt: Statement,
306        query_ctx: QueryContextRef,
307        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
308    ) -> Result<Output> {
309        match stmt {
310            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
311                // TODO: remove this when format is supported in datafusion
312                if let Statement::Explain(explain) = &stmt
313                    && let Some(format) = explain.format()
314                {
315                    query_ctx.set_explain_format(format.to_string());
316                }
317
318                self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
319                    .await
320            }
321            Statement::Tql(tql) => {
322                self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
323                    .await
324            }
325            _ => {
326                query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
327                self.statement_executor
328                    .execute_sql(stmt, query_ctx)
329                    .await
330                    .context(TableOperationSnafu)
331            }
332        }
333    }
334
335    async fn plan_and_exec_sql(
336        &self,
337        stmt: Statement,
338        query_ctx: &QueryContextRef,
339        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
340    ) -> Result<Output> {
341        let stmt = QueryStatement::Sql(stmt);
342        let plan = self
343            .statement_executor
344            .plan(&stmt, query_ctx.clone())
345            .await?;
346        let QueryStatement::Sql(stmt) = stmt else {
347            unreachable!()
348        };
349        query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
350        self.statement_executor
351            .exec_plan(plan, query_ctx.clone())
352            .await
353            .context(TableOperationSnafu)
354    }
355
356    async fn plan_and_exec_tql(
357        &self,
358        query_ctx: &QueryContextRef,
359        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
360        tql: Tql,
361    ) -> Result<Output> {
362        let plan = self
363            .statement_executor
364            .plan_tql(tql.clone(), query_ctx)
365            .await?;
366        query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
367        self.statement_executor
368            .exec_plan(plan, query_ctx.clone())
369            .await
370            .context(TableOperationSnafu)
371    }
372
373    async fn check_otlp_legacy(
374        &self,
375        names: &[&String],
376        ctx: QueryContextRef,
377    ) -> server_error::Result<bool> {
378        let db_string = ctx.get_db_string();
379        // fast cache check
380        let cache = self
381            .otlp_metrics_table_legacy_cache
382            .entry(db_string.clone())
383            .or_default();
384        if let Some(flag) = fast_legacy_check(&cache, names)? {
385            return Ok(flag);
386        }
387        // release cache reference to avoid lock contention
388        drop(cache);
389
390        let catalog = ctx.current_catalog();
391        let schema = ctx.current_schema();
392
393        // query legacy table names
394        let normalized_names = names
395            .iter()
396            .map(|n| legacy_normalize_otlp_name(n))
397            .collect::<Vec<_>>();
398        let table_names = normalized_names
399            .iter()
400            .map(|n| TableNameKey::new(catalog, &schema, n))
401            .collect::<Vec<_>>();
402        let table_values = self
403            .table_metadata_manager()
404            .table_name_manager()
405            .batch_get(table_names)
406            .await
407            .context(CommonMetaSnafu)?;
408        let table_ids = table_values
409            .into_iter()
410            .filter_map(|v| v.map(|vi| vi.table_id()))
411            .collect::<Vec<_>>();
412
413        // means no existing table is found, use new mode
414        if table_ids.is_empty() {
415            let cache = self
416                .otlp_metrics_table_legacy_cache
417                .entry(db_string)
418                .or_default();
419            names.iter().for_each(|name| {
420                cache.insert(name.to_string(), false);
421            });
422            return Ok(false);
423        }
424
425        // has existing table, check table options
426        let table_infos = self
427            .table_metadata_manager()
428            .table_info_manager()
429            .batch_get(&table_ids)
430            .await
431            .context(CommonMetaSnafu)?;
432        let options = table_infos
433            .values()
434            .map(|info| {
435                info.table_info
436                    .meta
437                    .options
438                    .extra_options
439                    .get(OTLP_METRIC_COMPAT_KEY)
440                    .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
441            })
442            .collect::<Vec<_>>();
443        let cache = self
444            .otlp_metrics_table_legacy_cache
445            .entry(db_string)
446            .or_default();
447        if !options.is_empty() {
448            // check value consistency
449            let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
450            let has_legacy = options
451                .iter()
452                .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
453            ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
454            let flag = has_legacy;
455            names.iter().for_each(|name| {
456                cache.insert(name.to_string(), flag);
457            });
458            Ok(flag)
459        } else {
460            // no table info, use new mode
461            names.iter().for_each(|name| {
462                cache.insert(name.to_string(), false);
463            });
464            Ok(false)
465        }
466    }
467}
468
469fn fast_legacy_check(
470    cache: &DashMap<String, bool>,
471    names: &[&String],
472) -> server_error::Result<Option<bool>> {
473    let hit_cache = names
474        .iter()
475        .filter_map(|name| cache.get(*name))
476        .collect::<Vec<_>>();
477    if !hit_cache.is_empty() {
478        let hit_legacy = hit_cache.iter().any(|en| *en.value());
479        let hit_prom = hit_cache.iter().any(|en| !*en.value());
480
481        // hit but have true and false, means both legacy and new mode are used
482        // we cannot handle this case, so return error
483        // add doc links in err msg later
484        ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
485
486        let flag = hit_legacy;
487        // drop hit_cache to release references before inserting to avoid deadlock
488        drop(hit_cache);
489
490        // set cache for all names
491        names.iter().for_each(|name| {
492            if !cache.contains_key(*name) {
493                cache.insert(name.to_string(), flag);
494            }
495        });
496        Ok(Some(flag))
497    } else {
498        Ok(None)
499    }
500}
501
502/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
503/// For MySQL, it applies only to read-only statements.
504fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
505    let query_timeout = query_ctx.query_timeout()?;
506    if query_timeout.is_zero() {
507        return None;
508    }
509    match query_ctx.channel() {
510        Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
511        Channel::Postgres => Some(query_timeout),
512        _ => None,
513    }
514}
515
516fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
517    if timeout.is_zero() {
518        return StatementTimeoutSnafu.fail();
519    }
520
521    let output = match output.data {
522        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
523        OutputData::Stream(mut stream) => {
524            let schema = stream.schema();
525            let s = Box::pin(stream! {
526                let mut start = tokio::time::Instant::now();
527                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
528                    yield item;
529
530                    let now = tokio::time::Instant::now();
531                    timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
532                    start = now;
533                    // tokio::time::timeout may not return an error immediately when timeout is 0.
534                    if timeout.is_zero() {
535                        StreamTimeoutSnafu.fail()?;
536                    }
537                }
538            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
539            let stream = RecordBatchStreamWrapper {
540                schema,
541                stream: s,
542                output_ordering: None,
543                metrics: Default::default(),
544            };
545            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
546        }
547    };
548
549    Ok(output)
550}
551
552#[async_trait]
553impl SqlQueryHandler for Instance {
554    type Error = Error;
555
556    #[tracing::instrument(skip_all)]
557    async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
558        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
559        let query_interceptor = query_interceptor_opt.as_ref();
560        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
561            Ok(q) => q,
562            Err(e) => return vec![Err(e)],
563        };
564
565        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
566        let checker = checker_ref.as_ref();
567
568        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
569            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
570        {
571            Ok(stmts) => {
572                if stmts.is_empty() {
573                    return vec![
574                        InvalidSqlSnafu {
575                            err_msg: "empty statements",
576                        }
577                        .fail(),
578                    ];
579                }
580
581                let mut results = Vec::with_capacity(stmts.len());
582                for stmt in stmts {
583                    if let Err(e) = checker
584                        .check_permission(
585                            query_ctx.current_user(),
586                            PermissionReq::SqlStatement(&stmt),
587                        )
588                        .context(PermissionSnafu)
589                    {
590                        results.push(Err(e));
591                        break;
592                    }
593
594                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
595                        Ok(output) => {
596                            let output_result =
597                                query_interceptor.post_execute(output, query_ctx.clone());
598                            results.push(output_result);
599                        }
600                        Err(e) => {
601                            if e.status_code().should_log_error() {
602                                error!(e; "Failed to execute query: {stmt}");
603                            } else {
604                                debug!("Failed to execute query: {stmt}, {e}");
605                            }
606                            results.push(Err(e));
607                            break;
608                        }
609                    }
610                }
611                results
612            }
613            Err(e) => {
614                vec![Err(e)]
615            }
616        }
617    }
618
619    async fn do_exec_plan(
620        &self,
621        stmt: Option<Statement>,
622        plan: LogicalPlan,
623        query_ctx: QueryContextRef,
624    ) -> Result<Output> {
625        if should_capture_statement(stmt.as_ref()) {
626            // It's safe to unwrap here because we've already checked the type.
627            let stmt = stmt.unwrap();
628            let query = stmt.to_string();
629            let slow_query_timer = self
630                .slow_query_options
631                .enable
632                .then(|| self.event_recorder.clone())
633                .flatten()
634                .map(|event_recorder| {
635                    SlowQueryTimer::new(
636                        CatalogQueryStatement::Sql(stmt.clone()),
637                        self.slow_query_options.threshold,
638                        self.slow_query_options.sample_ratio,
639                        self.slow_query_options.record_type,
640                        event_recorder,
641                    )
642                });
643
644            let ticket = self.process_manager.register_query(
645                query_ctx.current_catalog().to_string(),
646                vec![query_ctx.current_schema()],
647                query,
648                query_ctx.conn_info().to_string(),
649                Some(query_ctx.process_id()),
650                slow_query_timer,
651            );
652
653            let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
654
655            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
656                .await
657                .map_err(|_| error::CancelledSnafu.build())?
658                .map(|output| {
659                    let Output { meta, data } = output;
660
661                    let data = match data {
662                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
663                            CancellableStreamWrapper::new(stream, ticket),
664                        )),
665                        other => other,
666                    };
667                    Output { data, meta }
668                })
669                .context(ExecLogicalPlanSnafu)
670        } else {
671            // plan should be prepared before exec
672            // we'll do check there
673            self.query_engine
674                .execute(plan.clone(), query_ctx)
675                .await
676                .context(ExecLogicalPlanSnafu)
677        }
678    }
679
680    #[tracing::instrument(skip_all)]
681    async fn do_promql_query(
682        &self,
683        query: &PromQuery,
684        query_ctx: QueryContextRef,
685    ) -> Vec<Result<Output>> {
686        // check will be done in prometheus handler's do_query
687        let result = PrometheusHandler::do_query(self, query, query_ctx)
688            .await
689            .with_context(|_| ExecutePromqlSnafu {
690                query: format!("{query:?}"),
691            });
692        vec![result]
693    }
694
695    async fn do_describe(
696        &self,
697        stmt: Statement,
698        query_ctx: QueryContextRef,
699    ) -> Result<Option<DescribeResult>> {
700        if matches!(
701            stmt,
702            Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
703        ) {
704            self.plugins
705                .get::<PermissionCheckerRef>()
706                .as_ref()
707                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
708                .context(PermissionSnafu)?;
709
710            let plan = self
711                .query_engine
712                .planner()
713                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
714                .await
715                .context(PlanStatementSnafu)?;
716            self.query_engine
717                .describe(plan, query_ctx)
718                .await
719                .map(Some)
720                .context(error::DescribeStatementSnafu)
721        } else {
722            Ok(None)
723        }
724    }
725
726    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
727        self.catalog_manager
728            .schema_exists(catalog, schema, None)
729            .await
730            .context(error::CatalogSnafu)
731    }
732}
733
734/// Attaches a timer to the output and observes it once the output is exhausted.
735pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
736    match output.data {
737        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
738        OutputData::Stream(stream) => {
739            let stream = OnDone::new(stream, move || {
740                timer.observe_duration();
741            });
742            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
743        }
744    }
745}
746
747#[async_trait]
748impl PrometheusHandler for Instance {
749    #[tracing::instrument(skip_all)]
750    async fn do_query(
751        &self,
752        query: &PromQuery,
753        query_ctx: QueryContextRef,
754    ) -> server_error::Result<Output> {
755        let interceptor = self
756            .plugins
757            .get::<PromQueryInterceptorRef<server_error::Error>>();
758
759        self.plugins
760            .get::<PermissionCheckerRef>()
761            .as_ref()
762            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
763            .context(AuthSnafu)?;
764
765        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
766            ParsePromQLSnafu {
767                query: query.clone(),
768            }
769        })?;
770
771        let plan = self
772            .statement_executor
773            .plan(&stmt, query_ctx.clone())
774            .await
775            .map_err(BoxedError::new)
776            .context(ExecuteQuerySnafu)?;
777
778        interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
779
780        // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
781        let query_statement = if let QueryStatement::Promql(eval_stmt) = stmt {
782            CatalogQueryStatement::Promql(eval_stmt)
783        } else {
784            // It should not happen since the query is already parsed successfully.
785            return UnexpectedResultSnafu {
786                reason: "The query should always be promql.".to_string(),
787            }
788            .fail();
789        };
790        let query = query_statement.to_string();
791
792        let slow_query_timer = self
793            .slow_query_options
794            .enable
795            .then(|| self.event_recorder.clone())
796            .flatten()
797            .map(|event_recorder| {
798                SlowQueryTimer::new(
799                    query_statement,
800                    self.slow_query_options.threshold,
801                    self.slow_query_options.sample_ratio,
802                    self.slow_query_options.record_type,
803                    event_recorder,
804                )
805            });
806
807        let ticket = self.process_manager.register_query(
808            query_ctx.current_catalog().to_string(),
809            vec![query_ctx.current_schema()],
810            query,
811            query_ctx.conn_info().to_string(),
812            Some(query_ctx.process_id()),
813            slow_query_timer,
814        );
815
816        let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
817
818        let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
819            .await
820            .map_err(|_| servers::error::CancelledSnafu.build())?
821            .map(|output| {
822                let Output { meta, data } = output;
823                let data = match data {
824                    OutputData::Stream(stream) => {
825                        OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
826                    }
827                    other => other,
828                };
829                Output { data, meta }
830            })
831            .map_err(BoxedError::new)
832            .context(ExecuteQuerySnafu)?;
833
834        Ok(interceptor.post_execute(output, query_ctx)?)
835    }
836
837    async fn query_metric_names(
838        &self,
839        matchers: Vec<Matcher>,
840        ctx: &QueryContextRef,
841    ) -> server_error::Result<Vec<String>> {
842        self.handle_query_metric_names(matchers, ctx)
843            .await
844            .map_err(BoxedError::new)
845            .context(ExecuteQuerySnafu)
846    }
847
848    async fn query_label_values(
849        &self,
850        metric: String,
851        label_name: String,
852        matchers: Vec<Matcher>,
853        start: SystemTime,
854        end: SystemTime,
855        ctx: &QueryContextRef,
856    ) -> server_error::Result<Vec<String>> {
857        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
858            .await
859            .map_err(BoxedError::new)
860            .context(ExecuteQuerySnafu)
861    }
862
863    fn catalog_manager(&self) -> CatalogManagerRef {
864        self.catalog_manager.clone()
865    }
866}
867
868/// Validate `stmt.database` permission if it's presented.
869macro_rules! validate_db_permission {
870    ($stmt: expr, $query_ctx: expr) => {
871        if let Some(database) = &$stmt.database {
872            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
873                .map_err(BoxedError::new)
874                .context(SqlExecInterceptedSnafu)?;
875        }
876    };
877}
878
879pub fn check_permission(
880    plugins: Plugins,
881    stmt: &Statement,
882    query_ctx: &QueryContextRef,
883) -> Result<()> {
884    let need_validate = plugins
885        .get::<QueryOptions>()
886        .map(|opts| opts.disallow_cross_catalog_query)
887        .unwrap_or_default();
888
889    if !need_validate {
890        return Ok(());
891    }
892
893    match stmt {
894        // Will be checked in execution.
895        // TODO(dennis): add a hook for admin commands.
896        Statement::Admin(_) => {}
897        // These are executed by query engine, and will be checked there.
898        Statement::Query(_)
899        | Statement::Explain(_)
900        | Statement::Tql(_)
901        | Statement::Delete(_)
902        | Statement::DeclareCursor(_)
903        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
904        // database ops won't be checked
905        Statement::CreateDatabase(_)
906        | Statement::ShowDatabases(_)
907        | Statement::DropDatabase(_)
908        | Statement::AlterDatabase(_)
909        | Statement::DropFlow(_)
910        | Statement::Use(_) => {}
911        #[cfg(feature = "enterprise")]
912        Statement::DropTrigger(_) => {}
913        Statement::ShowCreateDatabase(stmt) => {
914            validate_database(&stmt.database_name, query_ctx)?;
915        }
916        Statement::ShowCreateTable(stmt) => {
917            validate_param(&stmt.table_name, query_ctx)?;
918        }
919        Statement::ShowCreateFlow(stmt) => {
920            validate_param(&stmt.flow_name, query_ctx)?;
921        }
922        Statement::ShowCreateView(stmt) => {
923            validate_param(&stmt.view_name, query_ctx)?;
924        }
925        Statement::CreateExternalTable(stmt) => {
926            validate_param(&stmt.name, query_ctx)?;
927        }
928        Statement::CreateFlow(stmt) => {
929            // TODO: should also validate source table name here?
930            validate_param(&stmt.sink_table_name, query_ctx)?;
931        }
932        #[cfg(feature = "enterprise")]
933        Statement::CreateTrigger(stmt) => {
934            validate_param(&stmt.trigger_name, query_ctx)?;
935        }
936        Statement::CreateView(stmt) => {
937            validate_param(&stmt.name, query_ctx)?;
938        }
939        Statement::AlterTable(stmt) => {
940            validate_param(stmt.table_name(), query_ctx)?;
941        }
942        #[cfg(feature = "enterprise")]
943        Statement::AlterTrigger(_) => {}
944        // set/show variable now only alter/show variable in session
945        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
946        // show charset and show collation won't be checked
947        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
948
949        Statement::Insert(insert) => {
950            let name = insert.table_name().context(ParseSqlSnafu)?;
951            validate_param(name, query_ctx)?;
952        }
953        Statement::CreateTable(stmt) => {
954            validate_param(&stmt.name, query_ctx)?;
955        }
956        Statement::CreateTableLike(stmt) => {
957            validate_param(&stmt.table_name, query_ctx)?;
958            validate_param(&stmt.source_name, query_ctx)?;
959        }
960        Statement::DropTable(drop_stmt) => {
961            for table_name in drop_stmt.table_names() {
962                validate_param(table_name, query_ctx)?;
963            }
964        }
965        Statement::DropView(stmt) => {
966            validate_param(&stmt.view_name, query_ctx)?;
967        }
968        Statement::ShowTables(stmt) => {
969            validate_db_permission!(stmt, query_ctx);
970        }
971        Statement::ShowTableStatus(stmt) => {
972            validate_db_permission!(stmt, query_ctx);
973        }
974        Statement::ShowColumns(stmt) => {
975            validate_db_permission!(stmt, query_ctx);
976        }
977        Statement::ShowIndex(stmt) => {
978            validate_db_permission!(stmt, query_ctx);
979        }
980        Statement::ShowRegion(stmt) => {
981            validate_db_permission!(stmt, query_ctx);
982        }
983        Statement::ShowViews(stmt) => {
984            validate_db_permission!(stmt, query_ctx);
985        }
986        Statement::ShowFlows(stmt) => {
987            validate_db_permission!(stmt, query_ctx);
988        }
989        #[cfg(feature = "enterprise")]
990        Statement::ShowTriggers(_stmt) => {
991            // The trigger is organized based on the catalog dimension, so there
992            // is no need to check the permission of the database(schema).
993        }
994        Statement::ShowStatus(_stmt) => {}
995        Statement::ShowSearchPath(_stmt) => {}
996        Statement::DescribeTable(stmt) => {
997            validate_param(stmt.name(), query_ctx)?;
998        }
999        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1000            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
1001            CopyTable::From(copy_table_from) => {
1002                validate_param(&copy_table_from.table_name, query_ctx)?
1003            }
1004        },
1005        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1006            match copy_database {
1007                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1008                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1009            }
1010        }
1011        Statement::TruncateTable(stmt) => {
1012            validate_param(stmt.table_name(), query_ctx)?;
1013        }
1014        // cursor operations are always allowed once it's created
1015        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1016        // User can only kill process in their own catalog.
1017        Statement::Kill(_) => {}
1018        // SHOW PROCESSLIST
1019        Statement::ShowProcesslist(_) => {}
1020    }
1021    Ok(())
1022}
1023
1024fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1025    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1026        .map_err(BoxedError::new)
1027        .context(ExternalSnafu)?;
1028
1029    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1030        .map_err(BoxedError::new)
1031        .context(SqlExecInterceptedSnafu)
1032}
1033
1034fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1035    let (catalog, schema) = match &name.0[..] {
1036        [schema] => (
1037            query_ctx.current_catalog().to_string(),
1038            schema.to_string_unquoted(),
1039        ),
1040        [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1041        _ => InvalidSqlSnafu {
1042            err_msg: format!(
1043                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1044            ),
1045        }
1046        .fail()?,
1047    };
1048
1049    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1050        .map_err(BoxedError::new)
1051        .context(SqlExecInterceptedSnafu)
1052}
1053
1054// Create a query ticket and slow query timer if the statement is a query or readonly statement.
1055fn should_capture_statement(stmt: Option<&Statement>) -> bool {
1056    if let Some(stmt) = stmt {
1057        matches!(stmt, Statement::Query(_)) || stmt.is_readonly()
1058    } else {
1059        false
1060    }
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065    use std::collections::HashMap;
1066    use std::sync::atomic::{AtomicBool, Ordering};
1067    use std::sync::{Arc, Barrier};
1068    use std::thread;
1069    use std::time::{Duration, Instant};
1070
1071    use common_base::Plugins;
1072    use query::query_engine::options::QueryOptions;
1073    use session::context::QueryContext;
1074    use sql::dialect::GreptimeDbDialect;
1075    use strfmt::Format;
1076
1077    use super::*;
1078
1079    #[test]
1080    fn test_fast_legacy_check_deadlock_prevention() {
1081        // Create a DashMap to simulate the cache
1082        let cache = DashMap::new();
1083
1084        // Pre-populate cache with some entries
1085        cache.insert("metric1".to_string(), true); // legacy mode
1086        cache.insert("metric2".to_string(), false); // prom mode
1087        cache.insert("metric3".to_string(), true); // legacy mode
1088
1089        // Test case 1: Normal operation with cache hits
1090        let metric1 = "metric1".to_string();
1091        let metric4 = "metric4".to_string();
1092        let names1 = vec![&metric1, &metric4];
1093        let result = fast_legacy_check(&cache, &names1);
1094        assert!(result.is_ok());
1095        assert_eq!(result.unwrap(), Some(true)); // should return legacy mode
1096
1097        // Verify that metric4 was added to cache
1098        assert!(cache.contains_key("metric4"));
1099        assert!(*cache.get("metric4").unwrap().value());
1100
1101        // Test case 2: No cache hits
1102        let metric5 = "metric5".to_string();
1103        let metric6 = "metric6".to_string();
1104        let names2 = vec![&metric5, &metric6];
1105        let result = fast_legacy_check(&cache, &names2);
1106        assert!(result.is_ok());
1107        assert_eq!(result.unwrap(), None); // should return None as no cache hits
1108
1109        // Test case 3: Incompatible modes should return error
1110        let cache_incompatible = DashMap::new();
1111        cache_incompatible.insert("metric1".to_string(), true); // legacy
1112        cache_incompatible.insert("metric2".to_string(), false); // prom
1113        let metric1_test = "metric1".to_string();
1114        let metric2_test = "metric2".to_string();
1115        let names3 = vec![&metric1_test, &metric2_test];
1116        let result = fast_legacy_check(&cache_incompatible, &names3);
1117        assert!(result.is_err()); // should error due to incompatible modes
1118
1119        // Test case 4: Intensive concurrent access to test deadlock prevention
1120        // This test specifically targets the scenario where multiple threads
1121        // access the same cache entries simultaneously
1122        let cache_concurrent = Arc::new(DashMap::new());
1123        cache_concurrent.insert("shared_metric".to_string(), true);
1124
1125        let num_threads = 8;
1126        let operations_per_thread = 100;
1127        let barrier = Arc::new(Barrier::new(num_threads));
1128        let success_flag = Arc::new(AtomicBool::new(true));
1129
1130        let handles: Vec<_> = (0..num_threads)
1131            .map(|thread_id| {
1132                let cache_clone = Arc::clone(&cache_concurrent);
1133                let barrier_clone = Arc::clone(&barrier);
1134                let success_flag_clone = Arc::clone(&success_flag);
1135
1136                thread::spawn(move || {
1137                    // Wait for all threads to be ready
1138                    barrier_clone.wait();
1139
1140                    let start_time = Instant::now();
1141                    for i in 0..operations_per_thread {
1142                        // Each operation references existing cache entry and adds new ones
1143                        let shared_metric = "shared_metric".to_string();
1144                        let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1145                        let names = vec![&shared_metric, &new_metric];
1146
1147                        match fast_legacy_check(&cache_clone, &names) {
1148                            Ok(_) => {}
1149                            Err(_) => {
1150                                success_flag_clone.store(false, Ordering::Relaxed);
1151                                return;
1152                            }
1153                        }
1154
1155                        // If the test takes too long, it likely means deadlock
1156                        if start_time.elapsed() > Duration::from_secs(10) {
1157                            success_flag_clone.store(false, Ordering::Relaxed);
1158                            return;
1159                        }
1160                    }
1161                })
1162            })
1163            .collect();
1164
1165        // Join all threads with timeout
1166        let start_time = Instant::now();
1167        for (i, handle) in handles.into_iter().enumerate() {
1168            let join_result = handle.join();
1169
1170            // Check if we're taking too long (potential deadlock)
1171            if start_time.elapsed() > Duration::from_secs(30) {
1172                panic!("Test timed out - possible deadlock detected!");
1173            }
1174
1175            if join_result.is_err() {
1176                panic!("Thread {} panicked during execution", i);
1177            }
1178        }
1179
1180        // Verify all operations completed successfully
1181        assert!(
1182            success_flag.load(Ordering::Relaxed),
1183            "Some operations failed"
1184        );
1185
1186        // Verify that many new entries were added (proving operations completed)
1187        let final_count = cache_concurrent.len();
1188        assert!(
1189            final_count > 1 + num_threads * operations_per_thread / 2,
1190            "Expected more cache entries, got {}",
1191            final_count
1192        );
1193    }
1194
1195    #[test]
1196    fn test_exec_validation() {
1197        let query_ctx = QueryContext::arc();
1198        let plugins: Plugins = Plugins::new();
1199        plugins.insert(QueryOptions {
1200            disallow_cross_catalog_query: true,
1201        });
1202
1203        let sql = r#"
1204        SELECT * FROM demo;
1205        EXPLAIN SELECT * FROM demo;
1206        CREATE DATABASE test_database;
1207        SHOW DATABASES;
1208        "#;
1209        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1210        assert_eq!(stmts.len(), 4);
1211        for stmt in stmts {
1212            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1213            re.unwrap();
1214        }
1215
1216        let sql = r#"
1217        SHOW CREATE TABLE demo;
1218        ALTER TABLE demo ADD COLUMN new_col INT;
1219        "#;
1220        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1221        assert_eq!(stmts.len(), 2);
1222        for stmt in stmts {
1223            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1224            re.unwrap();
1225        }
1226
1227        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1228            // test right
1229            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1230            for (catalog, schema) in right {
1231                let sql = do_fmt(template_sql, catalog, schema);
1232                do_test(&sql, plugins.clone(), query_ctx, true);
1233            }
1234
1235            let wrong = vec![
1236                ("wrongcatalog.", "public."),
1237                ("wrongcatalog.", "wrongschema."),
1238            ];
1239            for (catalog, schema) in wrong {
1240                let sql = do_fmt(template_sql, catalog, schema);
1241                do_test(&sql, plugins.clone(), query_ctx, false);
1242            }
1243        }
1244
1245        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1246            let vars = HashMap::from([
1247                ("catalog".to_string(), catalog),
1248                ("schema".to_string(), schema),
1249            ]);
1250            template.format(&vars).unwrap()
1251        }
1252
1253        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1254            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1255            let re = check_permission(plugins, stmt, query_ctx);
1256            if is_ok {
1257                re.unwrap();
1258            } else {
1259                assert!(re.is_err());
1260            }
1261        }
1262
1263        // test insert
1264        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1265        replace_test(sql, plugins.clone(), &query_ctx);
1266
1267        // test create table
1268        let sql = r#"CREATE TABLE {catalog}{schema}demo(
1269                            host STRING,
1270                            ts TIMESTAMP,
1271                            TIME INDEX (ts),
1272                            PRIMARY KEY(host)
1273                        ) engine=mito;"#;
1274        replace_test(sql, plugins.clone(), &query_ctx);
1275
1276        // test drop table
1277        let sql = "DROP TABLE {catalog}{schema}demo;";
1278        replace_test(sql, plugins.clone(), &query_ctx);
1279
1280        // test show tables
1281        let sql = "SHOW TABLES FROM public";
1282        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1283        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1284
1285        let sql = "SHOW TABLES FROM private";
1286        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1287        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1288        assert!(re.is_ok());
1289
1290        // test describe table
1291        let sql = "DESC TABLE {catalog}{schema}demo;";
1292        replace_test(sql, plugins, &query_ctx);
1293    }
1294}