frontend/
instance.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::pin::Pin;
29use std::sync::Arc;
30use std::time::{Duration, SystemTime};
31
32use async_stream::stream;
33use async_trait::async_trait;
34use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
35use catalog::process_manager::{
36    ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
37};
38use catalog::CatalogManagerRef;
39use client::OutputData;
40use common_base::cancellation::CancellableFuture;
41use common_base::Plugins;
42use common_config::KvBackendConfig;
43use common_error::ext::{BoxedError, ErrorExt};
44use common_event_recorder::EventRecorderRef;
45use common_meta::cache_invalidator::CacheInvalidatorRef;
46use common_meta::key::runtime_switch::RuntimeSwitchManager;
47use common_meta::key::table_name::TableNameKey;
48use common_meta::key::TableMetadataManagerRef;
49use common_meta::kv_backend::KvBackendRef;
50use common_meta::node_manager::NodeManagerRef;
51use common_meta::procedure_executor::ProcedureExecutorRef;
52use common_meta::state_store::KvStateStore;
53use common_procedure::local::{LocalManager, ManagerConfig};
54use common_procedure::options::ProcedureConfig;
55use common_procedure::ProcedureManagerRef;
56use common_query::Output;
57use common_recordbatch::error::StreamTimeoutSnafu;
58use common_recordbatch::RecordBatchStreamWrapper;
59use common_telemetry::logging::SlowQueryOptions;
60use common_telemetry::{debug, error, info, tracing};
61use dashmap::DashMap;
62use datafusion_expr::LogicalPlan;
63use futures::{Stream, StreamExt};
64use lazy_static::lazy_static;
65use log_store::raft_engine::RaftEngineBackend;
66use operator::delete::DeleterRef;
67use operator::insert::InserterRef;
68use operator::statement::{StatementExecutor, StatementExecutorRef};
69use partition::manager::PartitionRuleManagerRef;
70use pipeline::pipeline_operator::PipelineOperator;
71use prometheus::HistogramTimer;
72use promql_parser::label::Matcher;
73use query::metrics::OnDone;
74use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
75use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
76use query::query_engine::DescribeResult;
77use query::QueryEngineRef;
78use servers::error::{
79    self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
80    OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
81};
82use servers::interceptor::{
83    PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
84};
85use servers::otlp::metrics::legacy_normalize_otlp_name;
86use servers::prometheus_handler::PrometheusHandler;
87use servers::query_handler::sql::SqlQueryHandler;
88use session::context::{Channel, QueryContextRef};
89use session::table_name::table_idents_to_full_name;
90use snafu::prelude::*;
91use sql::ast::ObjectNamePartExt;
92use sql::dialect::Dialect;
93use sql::parser::{ParseOptions, ParserContext};
94use sql::statements::copy::{CopyDatabase, CopyTable};
95use sql::statements::statement::Statement;
96use sql::statements::tql::Tql;
97use sqlparser::ast::ObjectName;
98pub use standalone::StandaloneDatanodeManager;
99use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
100
101use crate::error::{
102    self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
103    ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
104    StatementTimeoutSnafu, TableOperationSnafu,
105};
106use crate::limiter::LimiterRef;
107use crate::stream_wrapper::CancellableStreamWrapper;
108
109lazy_static! {
110    static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
111}
112
113/// The frontend instance contains necessary components, and implements many
114/// traits, like [`servers::query_handler::grpc::GrpcQueryHandler`],
115/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
116#[derive(Clone)]
117pub struct Instance {
118    catalog_manager: CatalogManagerRef,
119    pipeline_operator: Arc<PipelineOperator>,
120    statement_executor: Arc<StatementExecutor>,
121    query_engine: QueryEngineRef,
122    plugins: Plugins,
123    inserter: InserterRef,
124    deleter: DeleterRef,
125    table_metadata_manager: TableMetadataManagerRef,
126    event_recorder: Option<EventRecorderRef>,
127    limiter: Option<LimiterRef>,
128    process_manager: ProcessManagerRef,
129    slow_query_options: SlowQueryOptions,
130
131    // cache for otlp metrics
132    // first layer key: db-string
133    // key: direct input metric name
134    // value: if runs in legacy mode
135    otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
136}
137
138impl Instance {
139    pub async fn try_build_standalone_components(
140        dir: String,
141        kv_backend_config: KvBackendConfig,
142        procedure_config: ProcedureConfig,
143    ) -> Result<(KvBackendRef, ProcedureManagerRef)> {
144        info!(
145            "Creating metadata kvbackend with config: {:?}",
146            kv_backend_config
147        );
148        let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
149            .map_err(BoxedError::new)
150            .context(error::OpenRaftEngineBackendSnafu)?;
151
152        let kv_backend = Arc::new(kv_backend);
153        let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
154
155        let manager_config = ManagerConfig {
156            max_retry_times: procedure_config.max_retry_times,
157            retry_delay: procedure_config.retry_delay,
158            max_running_procedures: procedure_config.max_running_procedures,
159            ..Default::default()
160        };
161        let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
162        let procedure_manager = Arc::new(LocalManager::new(
163            manager_config,
164            kv_state_store.clone(),
165            kv_state_store,
166            Some(runtime_switch_manager),
167            None,
168        ));
169
170        Ok((kv_backend, procedure_manager))
171    }
172
173    pub fn catalog_manager(&self) -> &CatalogManagerRef {
174        &self.catalog_manager
175    }
176
177    pub fn query_engine(&self) -> &QueryEngineRef {
178        &self.query_engine
179    }
180
181    pub fn plugins(&self) -> &Plugins {
182        &self.plugins
183    }
184
185    pub fn statement_executor(&self) -> &StatementExecutorRef {
186        &self.statement_executor
187    }
188
189    pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
190        &self.table_metadata_manager
191    }
192
193    pub fn inserter(&self) -> &InserterRef {
194        &self.inserter
195    }
196
197    pub fn process_manager(&self) -> &ProcessManagerRef {
198        &self.process_manager
199    }
200
201    pub fn node_manager(&self) -> &NodeManagerRef {
202        self.inserter.node_manager()
203    }
204
205    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
206        self.inserter.partition_manager()
207    }
208
209    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
210        self.statement_executor.cache_invalidator()
211    }
212
213    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
214        self.statement_executor.procedure_executor()
215    }
216}
217
218fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
219    ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
220}
221
222impl Instance {
223    async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
224        check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
225
226        let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
227        let query_interceptor = query_interceptor.as_ref();
228
229        if should_capture_statement(Some(&stmt)) {
230            let slow_query_timer = self
231                .slow_query_options
232                .enable
233                .then(|| self.event_recorder.clone())
234                .flatten()
235                .map(|event_recorder| {
236                    SlowQueryTimer::new(
237                        CatalogQueryStatement::Sql(stmt.clone()),
238                        self.slow_query_options.threshold,
239                        self.slow_query_options.sample_ratio,
240                        self.slow_query_options.record_type,
241                        event_recorder,
242                    )
243                });
244
245            let ticket = self.process_manager.register_query(
246                query_ctx.current_catalog().to_string(),
247                vec![query_ctx.current_schema()],
248                stmt.to_string(),
249                query_ctx.conn_info().to_string(),
250                Some(query_ctx.process_id()),
251                slow_query_timer,
252            );
253
254            let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
255
256            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
257                .await
258                .map_err(|_| error::CancelledSnafu.build())?
259                .map(|output| {
260                    let Output { meta, data } = output;
261
262                    let data = match data {
263                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
264                            CancellableStreamWrapper::new(stream, ticket),
265                        )),
266                        other => other,
267                    };
268                    Output { data, meta }
269                })
270        } else {
271            self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
272                .await
273        }
274    }
275
276    async fn exec_statement_with_timeout(
277        &self,
278        stmt: Statement,
279        query_ctx: QueryContextRef,
280        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
281    ) -> Result<Output> {
282        let timeout = derive_timeout(&stmt, &query_ctx);
283        match timeout {
284            Some(timeout) => {
285                let start = tokio::time::Instant::now();
286                let output = tokio::time::timeout(
287                    timeout,
288                    self.exec_statement(stmt, query_ctx, query_interceptor),
289                )
290                .await
291                .map_err(|_| StatementTimeoutSnafu.build())??;
292                // compute remaining timeout
293                let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
294                attach_timeout(output, remaining_timeout)
295            }
296            None => {
297                self.exec_statement(stmt, query_ctx, query_interceptor)
298                    .await
299            }
300        }
301    }
302
303    async fn exec_statement(
304        &self,
305        stmt: Statement,
306        query_ctx: QueryContextRef,
307        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
308    ) -> Result<Output> {
309        match stmt {
310            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
311                // TODO: remove this when format is supported in datafusion
312                if let Statement::Explain(explain) = &stmt {
313                    if let Some(format) = explain.format() {
314                        query_ctx.set_explain_format(format.to_string());
315                    }
316                }
317
318                self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
319                    .await
320            }
321            Statement::Tql(tql) => {
322                self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
323                    .await
324            }
325            _ => {
326                query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
327                self.statement_executor
328                    .execute_sql(stmt, query_ctx)
329                    .await
330                    .context(TableOperationSnafu)
331            }
332        }
333    }
334
335    async fn plan_and_exec_sql(
336        &self,
337        stmt: Statement,
338        query_ctx: &QueryContextRef,
339        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
340    ) -> Result<Output> {
341        let stmt = QueryStatement::Sql(stmt);
342        let plan = self
343            .statement_executor
344            .plan(&stmt, query_ctx.clone())
345            .await?;
346        let QueryStatement::Sql(stmt) = stmt else {
347            unreachable!()
348        };
349        query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
350        self.statement_executor
351            .exec_plan(plan, query_ctx.clone())
352            .await
353            .context(TableOperationSnafu)
354    }
355
356    async fn plan_and_exec_tql(
357        &self,
358        query_ctx: &QueryContextRef,
359        query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
360        tql: Tql,
361    ) -> Result<Output> {
362        let plan = self
363            .statement_executor
364            .plan_tql(tql.clone(), query_ctx)
365            .await?;
366        query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
367        self.statement_executor
368            .exec_plan(plan, query_ctx.clone())
369            .await
370            .context(TableOperationSnafu)
371    }
372
373    async fn check_otlp_legacy(
374        &self,
375        names: &[&String],
376        ctx: QueryContextRef,
377    ) -> server_error::Result<bool> {
378        let db_string = ctx.get_db_string();
379        let cache = self
380            .otlp_metrics_table_legacy_cache
381            .entry(db_string)
382            .or_default();
383
384        // check cache
385        let hit_cache = names
386            .iter()
387            .filter_map(|name| cache.get(*name))
388            .collect::<Vec<_>>();
389        if !hit_cache.is_empty() {
390            let hit_legacy = hit_cache.iter().any(|en| *en.value());
391            let hit_prom = hit_cache.iter().any(|en| !*en.value());
392
393            // hit but have true and false, means both legacy and new mode are used
394            // we cannot handle this case, so return error
395            // add doc links in err msg later
396            ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
397
398            let flag = hit_legacy;
399            // set cache for all names
400            names.iter().for_each(|name| {
401                if !cache.contains_key(*name) {
402                    cache.insert(name.to_string(), flag);
403                }
404            });
405            return Ok(flag);
406        }
407
408        let catalog = ctx.current_catalog();
409        let schema = ctx.current_schema();
410
411        // query legacy table names
412        let normalized_names = names
413            .iter()
414            .map(|n| legacy_normalize_otlp_name(n))
415            .collect::<Vec<_>>();
416        let table_names = normalized_names
417            .iter()
418            .map(|n| TableNameKey::new(catalog, &schema, n))
419            .collect::<Vec<_>>();
420        let table_values = self
421            .table_metadata_manager()
422            .table_name_manager()
423            .batch_get(table_names)
424            .await
425            .context(CommonMetaSnafu)?;
426        let table_ids = table_values
427            .into_iter()
428            .filter_map(|v| v.map(|vi| vi.table_id()))
429            .collect::<Vec<_>>();
430
431        // means no existing table is found, use new mode
432        if table_ids.is_empty() {
433            // set cache
434            names.iter().for_each(|name| {
435                cache.insert(name.to_string(), false);
436            });
437            return Ok(false);
438        }
439
440        // has existing table, check table options
441        let table_infos = self
442            .table_metadata_manager()
443            .table_info_manager()
444            .batch_get(&table_ids)
445            .await
446            .context(CommonMetaSnafu)?;
447        let options = table_infos
448            .values()
449            .map(|info| {
450                info.table_info
451                    .meta
452                    .options
453                    .extra_options
454                    .get(OTLP_METRIC_COMPAT_KEY)
455                    .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
456            })
457            .collect::<Vec<_>>();
458        if !options.is_empty() {
459            // check value consistency
460            let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
461            let has_legacy = options
462                .iter()
463                .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
464            ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
465            let flag = has_legacy;
466            names.iter().for_each(|name| {
467                cache.insert(name.to_string(), flag);
468            });
469            Ok(flag)
470        } else {
471            // no table info, use new mode
472            names.iter().for_each(|name| {
473                cache.insert(name.to_string(), false);
474            });
475            Ok(false)
476        }
477    }
478}
479
480/// If the relevant variables are set, the timeout is enforced for all PostgreSQL statements.
481/// For MySQL, it applies only to read-only statements.
482fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
483    let query_timeout = query_ctx.query_timeout()?;
484    if query_timeout.is_zero() {
485        return None;
486    }
487    match query_ctx.channel() {
488        Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
489        Channel::Postgres => Some(query_timeout),
490        _ => None,
491    }
492}
493
494fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
495    if timeout.is_zero() {
496        return StatementTimeoutSnafu.fail();
497    }
498
499    let output = match output.data {
500        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
501        OutputData::Stream(mut stream) => {
502            let schema = stream.schema();
503            let s = Box::pin(stream! {
504                let mut start = tokio::time::Instant::now();
505                while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
506                    yield item;
507
508                    let now = tokio::time::Instant::now();
509                    timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
510                    start = now;
511                    // tokio::time::timeout may not return an error immediately when timeout is 0.
512                    if timeout.is_zero() {
513                        StreamTimeoutSnafu.fail()?;
514                    }
515                }
516            }) as Pin<Box<dyn Stream<Item = _> + Send>>;
517            let stream = RecordBatchStreamWrapper {
518                schema,
519                stream: s,
520                output_ordering: None,
521                metrics: Default::default(),
522            };
523            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
524        }
525    };
526
527    Ok(output)
528}
529
530#[async_trait]
531impl SqlQueryHandler for Instance {
532    type Error = Error;
533
534    #[tracing::instrument(skip_all)]
535    async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
536        let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
537        let query_interceptor = query_interceptor_opt.as_ref();
538        let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
539            Ok(q) => q,
540            Err(e) => return vec![Err(e)],
541        };
542
543        let checker_ref = self.plugins.get::<PermissionCheckerRef>();
544        let checker = checker_ref.as_ref();
545
546        match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
547            .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
548        {
549            Ok(stmts) => {
550                if stmts.is_empty() {
551                    return vec![InvalidSqlSnafu {
552                        err_msg: "empty statements",
553                    }
554                    .fail()];
555                }
556
557                let mut results = Vec::with_capacity(stmts.len());
558                for stmt in stmts {
559                    if let Err(e) = checker
560                        .check_permission(
561                            query_ctx.current_user(),
562                            PermissionReq::SqlStatement(&stmt),
563                        )
564                        .context(PermissionSnafu)
565                    {
566                        results.push(Err(e));
567                        break;
568                    }
569
570                    match self.query_statement(stmt.clone(), query_ctx.clone()).await {
571                        Ok(output) => {
572                            let output_result =
573                                query_interceptor.post_execute(output, query_ctx.clone());
574                            results.push(output_result);
575                        }
576                        Err(e) => {
577                            if e.status_code().should_log_error() {
578                                error!(e; "Failed to execute query: {stmt}");
579                            } else {
580                                debug!("Failed to execute query: {stmt}, {e}");
581                            }
582                            results.push(Err(e));
583                            break;
584                        }
585                    }
586                }
587                results
588            }
589            Err(e) => {
590                vec![Err(e)]
591            }
592        }
593    }
594
595    async fn do_exec_plan(
596        &self,
597        stmt: Option<Statement>,
598        plan: LogicalPlan,
599        query_ctx: QueryContextRef,
600    ) -> Result<Output> {
601        if should_capture_statement(stmt.as_ref()) {
602            // It's safe to unwrap here because we've already checked the type.
603            let stmt = stmt.unwrap();
604            let query = stmt.to_string();
605            let slow_query_timer = self
606                .slow_query_options
607                .enable
608                .then(|| self.event_recorder.clone())
609                .flatten()
610                .map(|event_recorder| {
611                    SlowQueryTimer::new(
612                        CatalogQueryStatement::Sql(stmt.clone()),
613                        self.slow_query_options.threshold,
614                        self.slow_query_options.sample_ratio,
615                        self.slow_query_options.record_type,
616                        event_recorder,
617                    )
618                });
619
620            let ticket = self.process_manager.register_query(
621                query_ctx.current_catalog().to_string(),
622                vec![query_ctx.current_schema()],
623                query,
624                query_ctx.conn_info().to_string(),
625                Some(query_ctx.process_id()),
626                slow_query_timer,
627            );
628
629            let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
630
631            CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
632                .await
633                .map_err(|_| error::CancelledSnafu.build())?
634                .map(|output| {
635                    let Output { meta, data } = output;
636
637                    let data = match data {
638                        OutputData::Stream(stream) => OutputData::Stream(Box::pin(
639                            CancellableStreamWrapper::new(stream, ticket),
640                        )),
641                        other => other,
642                    };
643                    Output { data, meta }
644                })
645                .context(ExecLogicalPlanSnafu)
646        } else {
647            // plan should be prepared before exec
648            // we'll do check there
649            self.query_engine
650                .execute(plan.clone(), query_ctx)
651                .await
652                .context(ExecLogicalPlanSnafu)
653        }
654    }
655
656    #[tracing::instrument(skip_all)]
657    async fn do_promql_query(
658        &self,
659        query: &PromQuery,
660        query_ctx: QueryContextRef,
661    ) -> Vec<Result<Output>> {
662        // check will be done in prometheus handler's do_query
663        let result = PrometheusHandler::do_query(self, query, query_ctx)
664            .await
665            .with_context(|_| ExecutePromqlSnafu {
666                query: format!("{query:?}"),
667            });
668        vec![result]
669    }
670
671    async fn do_describe(
672        &self,
673        stmt: Statement,
674        query_ctx: QueryContextRef,
675    ) -> Result<Option<DescribeResult>> {
676        if matches!(
677            stmt,
678            Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
679        ) {
680            self.plugins
681                .get::<PermissionCheckerRef>()
682                .as_ref()
683                .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
684                .context(PermissionSnafu)?;
685
686            let plan = self
687                .query_engine
688                .planner()
689                .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
690                .await
691                .context(PlanStatementSnafu)?;
692            self.query_engine
693                .describe(plan, query_ctx)
694                .await
695                .map(Some)
696                .context(error::DescribeStatementSnafu)
697        } else {
698            Ok(None)
699        }
700    }
701
702    async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
703        self.catalog_manager
704            .schema_exists(catalog, schema, None)
705            .await
706            .context(error::CatalogSnafu)
707    }
708}
709
710/// Attaches a timer to the output and observes it once the output is exhausted.
711pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
712    match output.data {
713        OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
714        OutputData::Stream(stream) => {
715            let stream = OnDone::new(stream, move || {
716                timer.observe_duration();
717            });
718            Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
719        }
720    }
721}
722
723#[async_trait]
724impl PrometheusHandler for Instance {
725    #[tracing::instrument(skip_all)]
726    async fn do_query(
727        &self,
728        query: &PromQuery,
729        query_ctx: QueryContextRef,
730    ) -> server_error::Result<Output> {
731        let interceptor = self
732            .plugins
733            .get::<PromQueryInterceptorRef<server_error::Error>>();
734
735        self.plugins
736            .get::<PermissionCheckerRef>()
737            .as_ref()
738            .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
739            .context(AuthSnafu)?;
740
741        let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
742            ParsePromQLSnafu {
743                query: query.clone(),
744            }
745        })?;
746
747        let plan = self
748            .statement_executor
749            .plan(&stmt, query_ctx.clone())
750            .await
751            .map_err(BoxedError::new)
752            .context(ExecuteQuerySnafu)?;
753
754        interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
755
756        // Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
757        let query_statement = if let QueryStatement::Promql(eval_stmt) = stmt {
758            CatalogQueryStatement::Promql(eval_stmt)
759        } else {
760            // It should not happen since the query is already parsed successfully.
761            return UnexpectedResultSnafu {
762                reason: "The query should always be promql.".to_string(),
763            }
764            .fail();
765        };
766        let query = query_statement.to_string();
767
768        let slow_query_timer = self
769            .slow_query_options
770            .enable
771            .then(|| self.event_recorder.clone())
772            .flatten()
773            .map(|event_recorder| {
774                SlowQueryTimer::new(
775                    query_statement,
776                    self.slow_query_options.threshold,
777                    self.slow_query_options.sample_ratio,
778                    self.slow_query_options.record_type,
779                    event_recorder,
780                )
781            });
782
783        let ticket = self.process_manager.register_query(
784            query_ctx.current_catalog().to_string(),
785            vec![query_ctx.current_schema()],
786            query,
787            query_ctx.conn_info().to_string(),
788            Some(query_ctx.process_id()),
789            slow_query_timer,
790        );
791
792        let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
793
794        let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
795            .await
796            .map_err(|_| servers::error::CancelledSnafu.build())?
797            .map(|output| {
798                let Output { meta, data } = output;
799                let data = match data {
800                    OutputData::Stream(stream) => {
801                        OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
802                    }
803                    other => other,
804                };
805                Output { data, meta }
806            })
807            .map_err(BoxedError::new)
808            .context(ExecuteQuerySnafu)?;
809
810        Ok(interceptor.post_execute(output, query_ctx)?)
811    }
812
813    async fn query_metric_names(
814        &self,
815        matchers: Vec<Matcher>,
816        ctx: &QueryContextRef,
817    ) -> server_error::Result<Vec<String>> {
818        self.handle_query_metric_names(matchers, ctx)
819            .await
820            .map_err(BoxedError::new)
821            .context(ExecuteQuerySnafu)
822    }
823
824    async fn query_label_values(
825        &self,
826        metric: String,
827        label_name: String,
828        matchers: Vec<Matcher>,
829        start: SystemTime,
830        end: SystemTime,
831        ctx: &QueryContextRef,
832    ) -> server_error::Result<Vec<String>> {
833        self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
834            .await
835            .map_err(BoxedError::new)
836            .context(ExecuteQuerySnafu)
837    }
838
839    fn catalog_manager(&self) -> CatalogManagerRef {
840        self.catalog_manager.clone()
841    }
842}
843
844/// Validate `stmt.database` permission if it's presented.
845macro_rules! validate_db_permission {
846    ($stmt: expr, $query_ctx: expr) => {
847        if let Some(database) = &$stmt.database {
848            validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
849                .map_err(BoxedError::new)
850                .context(SqlExecInterceptedSnafu)?;
851        }
852    };
853}
854
855pub fn check_permission(
856    plugins: Plugins,
857    stmt: &Statement,
858    query_ctx: &QueryContextRef,
859) -> Result<()> {
860    let need_validate = plugins
861        .get::<QueryOptions>()
862        .map(|opts| opts.disallow_cross_catalog_query)
863        .unwrap_or_default();
864
865    if !need_validate {
866        return Ok(());
867    }
868
869    match stmt {
870        // Will be checked in execution.
871        // TODO(dennis): add a hook for admin commands.
872        Statement::Admin(_) => {}
873        // These are executed by query engine, and will be checked there.
874        Statement::Query(_)
875        | Statement::Explain(_)
876        | Statement::Tql(_)
877        | Statement::Delete(_)
878        | Statement::DeclareCursor(_)
879        | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
880        // database ops won't be checked
881        Statement::CreateDatabase(_)
882        | Statement::ShowDatabases(_)
883        | Statement::DropDatabase(_)
884        | Statement::AlterDatabase(_)
885        | Statement::DropFlow(_)
886        | Statement::Use(_) => {}
887        #[cfg(feature = "enterprise")]
888        Statement::DropTrigger(_) => {}
889        Statement::ShowCreateDatabase(stmt) => {
890            validate_database(&stmt.database_name, query_ctx)?;
891        }
892        Statement::ShowCreateTable(stmt) => {
893            validate_param(&stmt.table_name, query_ctx)?;
894        }
895        Statement::ShowCreateFlow(stmt) => {
896            validate_param(&stmt.flow_name, query_ctx)?;
897        }
898        Statement::ShowCreateView(stmt) => {
899            validate_param(&stmt.view_name, query_ctx)?;
900        }
901        Statement::CreateExternalTable(stmt) => {
902            validate_param(&stmt.name, query_ctx)?;
903        }
904        Statement::CreateFlow(stmt) => {
905            // TODO: should also validate source table name here?
906            validate_param(&stmt.sink_table_name, query_ctx)?;
907        }
908        #[cfg(feature = "enterprise")]
909        Statement::CreateTrigger(stmt) => {
910            validate_param(&stmt.trigger_name, query_ctx)?;
911        }
912        Statement::CreateView(stmt) => {
913            validate_param(&stmt.name, query_ctx)?;
914        }
915        Statement::AlterTable(stmt) => {
916            validate_param(stmt.table_name(), query_ctx)?;
917        }
918        #[cfg(feature = "enterprise")]
919        Statement::AlterTrigger(_) => {}
920        // set/show variable now only alter/show variable in session
921        Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
922        // show charset and show collation won't be checked
923        Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
924
925        Statement::Insert(insert) => {
926            let name = insert.table_name().context(ParseSqlSnafu)?;
927            validate_param(name, query_ctx)?;
928        }
929        Statement::CreateTable(stmt) => {
930            validate_param(&stmt.name, query_ctx)?;
931        }
932        Statement::CreateTableLike(stmt) => {
933            validate_param(&stmt.table_name, query_ctx)?;
934            validate_param(&stmt.source_name, query_ctx)?;
935        }
936        Statement::DropTable(drop_stmt) => {
937            for table_name in drop_stmt.table_names() {
938                validate_param(table_name, query_ctx)?;
939            }
940        }
941        Statement::DropView(stmt) => {
942            validate_param(&stmt.view_name, query_ctx)?;
943        }
944        Statement::ShowTables(stmt) => {
945            validate_db_permission!(stmt, query_ctx);
946        }
947        Statement::ShowTableStatus(stmt) => {
948            validate_db_permission!(stmt, query_ctx);
949        }
950        Statement::ShowColumns(stmt) => {
951            validate_db_permission!(stmt, query_ctx);
952        }
953        Statement::ShowIndex(stmt) => {
954            validate_db_permission!(stmt, query_ctx);
955        }
956        Statement::ShowRegion(stmt) => {
957            validate_db_permission!(stmt, query_ctx);
958        }
959        Statement::ShowViews(stmt) => {
960            validate_db_permission!(stmt, query_ctx);
961        }
962        Statement::ShowFlows(stmt) => {
963            validate_db_permission!(stmt, query_ctx);
964        }
965        #[cfg(feature = "enterprise")]
966        Statement::ShowTriggers(_stmt) => {
967            // The trigger is organized based on the catalog dimension, so there
968            // is no need to check the permission of the database(schema).
969        }
970        Statement::ShowStatus(_stmt) => {}
971        Statement::ShowSearchPath(_stmt) => {}
972        Statement::DescribeTable(stmt) => {
973            validate_param(stmt.name(), query_ctx)?;
974        }
975        Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
976            CopyTable::To(copy_table_to) => validate_param(&copy_table_to.table_name, query_ctx)?,
977            CopyTable::From(copy_table_from) => {
978                validate_param(&copy_table_from.table_name, query_ctx)?
979            }
980        },
981        Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
982            match copy_database {
983                CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
984                CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
985            }
986        }
987        Statement::TruncateTable(stmt) => {
988            validate_param(stmt.table_name(), query_ctx)?;
989        }
990        // cursor operations are always allowed once it's created
991        Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
992        // User can only kill process in their own catalog.
993        Statement::Kill(_) => {}
994        // SHOW PROCESSLIST
995        Statement::ShowProcesslist(_) => {}
996    }
997    Ok(())
998}
999
1000fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1001    let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1002        .map_err(BoxedError::new)
1003        .context(ExternalSnafu)?;
1004
1005    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1006        .map_err(BoxedError::new)
1007        .context(SqlExecInterceptedSnafu)
1008}
1009
1010fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1011    let (catalog, schema) = match &name.0[..] {
1012        [schema] => (
1013            query_ctx.current_catalog().to_string(),
1014            schema.to_string_unquoted(),
1015        ),
1016        [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1017        _ => InvalidSqlSnafu {
1018            err_msg: format!(
1019                "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1020            ),
1021        }
1022        .fail()?,
1023    };
1024
1025    validate_catalog_and_schema(&catalog, &schema, query_ctx)
1026        .map_err(BoxedError::new)
1027        .context(SqlExecInterceptedSnafu)
1028}
1029
1030// Create a query ticket and slow query timer if the statement is a query or readonly statement.
1031fn should_capture_statement(stmt: Option<&Statement>) -> bool {
1032    if let Some(stmt) = stmt {
1033        matches!(stmt, Statement::Query(_)) || stmt.is_readonly()
1034    } else {
1035        false
1036    }
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041    use std::collections::HashMap;
1042
1043    use common_base::Plugins;
1044    use query::query_engine::options::QueryOptions;
1045    use session::context::QueryContext;
1046    use sql::dialect::GreptimeDbDialect;
1047    use strfmt::Format;
1048
1049    use super::*;
1050
1051    #[test]
1052    fn test_exec_validation() {
1053        let query_ctx = QueryContext::arc();
1054        let plugins: Plugins = Plugins::new();
1055        plugins.insert(QueryOptions {
1056            disallow_cross_catalog_query: true,
1057        });
1058
1059        let sql = r#"
1060        SELECT * FROM demo;
1061        EXPLAIN SELECT * FROM demo;
1062        CREATE DATABASE test_database;
1063        SHOW DATABASES;
1064        "#;
1065        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1066        assert_eq!(stmts.len(), 4);
1067        for stmt in stmts {
1068            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1069            re.unwrap();
1070        }
1071
1072        let sql = r#"
1073        SHOW CREATE TABLE demo;
1074        ALTER TABLE demo ADD COLUMN new_col INT;
1075        "#;
1076        let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1077        assert_eq!(stmts.len(), 2);
1078        for stmt in stmts {
1079            let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1080            re.unwrap();
1081        }
1082
1083        fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1084            // test right
1085            let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1086            for (catalog, schema) in right {
1087                let sql = do_fmt(template_sql, catalog, schema);
1088                do_test(&sql, plugins.clone(), query_ctx, true);
1089            }
1090
1091            let wrong = vec![
1092                ("wrongcatalog.", "public."),
1093                ("wrongcatalog.", "wrongschema."),
1094            ];
1095            for (catalog, schema) in wrong {
1096                let sql = do_fmt(template_sql, catalog, schema);
1097                do_test(&sql, plugins.clone(), query_ctx, false);
1098            }
1099        }
1100
1101        fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1102            let vars = HashMap::from([
1103                ("catalog".to_string(), catalog),
1104                ("schema".to_string(), schema),
1105            ]);
1106            template.format(&vars).unwrap()
1107        }
1108
1109        fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1110            let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1111            let re = check_permission(plugins, stmt, query_ctx);
1112            if is_ok {
1113                re.unwrap();
1114            } else {
1115                assert!(re.is_err());
1116            }
1117        }
1118
1119        // test insert
1120        let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1121        replace_test(sql, plugins.clone(), &query_ctx);
1122
1123        // test create table
1124        let sql = r#"CREATE TABLE {catalog}{schema}demo(
1125                            host STRING,
1126                            ts TIMESTAMP,
1127                            TIME INDEX (ts),
1128                            PRIMARY KEY(host)
1129                        ) engine=mito;"#;
1130        replace_test(sql, plugins.clone(), &query_ctx);
1131
1132        // test drop table
1133        let sql = "DROP TABLE {catalog}{schema}demo;";
1134        replace_test(sql, plugins.clone(), &query_ctx);
1135
1136        // test show tables
1137        let sql = "SHOW TABLES FROM public";
1138        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1139        check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1140
1141        let sql = "SHOW TABLES FROM private";
1142        let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1143        let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1144        assert!(re.is_ok());
1145
1146        // test describe table
1147        let sql = "DESC TABLE {catalog}{schema}demo;";
1148        replace_test(sql, plugins, &query_ctx);
1149    }
1150}