1pub mod builder;
16mod dashboard;
17mod grpc;
18mod influxdb;
19mod jaeger;
20mod log_handler;
21mod logs;
22mod opentsdb;
23mod otlp;
24pub mod prom_store;
25mod promql;
26mod region_query;
27pub mod standalone;
28
29use std::pin::Pin;
30use std::sync::atomic::AtomicBool;
31use std::sync::{Arc, atomic};
32use std::time::{Duration, SystemTime};
33
34use async_stream::stream;
35use async_trait::async_trait;
36use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
37use catalog::CatalogManagerRef;
38use catalog::process_manager::{
39 ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
40};
41use client::OutputData;
42use common_base::Plugins;
43use common_base::cancellation::CancellableFuture;
44use common_error::ext::{BoxedError, ErrorExt};
45use common_event_recorder::EventRecorderRef;
46use common_meta::cache_invalidator::CacheInvalidatorRef;
47use common_meta::key::TableMetadataManagerRef;
48use common_meta::key::table_name::TableNameKey;
49use common_meta::node_manager::NodeManagerRef;
50use common_meta::procedure_executor::ProcedureExecutorRef;
51use common_query::Output;
52use common_recordbatch::RecordBatchStreamWrapper;
53use common_recordbatch::error::StreamTimeoutSnafu;
54use common_telemetry::logging::SlowQueryOptions;
55use common_telemetry::{debug, error, tracing};
56use dashmap::DashMap;
57use datafusion_expr::LogicalPlan;
58use futures::{Stream, StreamExt};
59use lazy_static::lazy_static;
60use operator::delete::DeleterRef;
61use operator::insert::InserterRef;
62use operator::statement::{StatementExecutor, StatementExecutorRef};
63use partition::manager::PartitionRuleManagerRef;
64use pipeline::pipeline_operator::PipelineOperator;
65use prometheus::HistogramTimer;
66use promql_parser::label::Matcher;
67use query::QueryEngineRef;
68use query::metrics::OnDone;
69use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
70use query::query_engine::DescribeResult;
71use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
72use servers::error::{
73 self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
74 OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
75};
76use servers::interceptor::{
77 PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
78};
79use servers::otlp::metrics::legacy_normalize_otlp_name;
80use servers::prometheus_handler::PrometheusHandler;
81use servers::query_handler::sql::SqlQueryHandler;
82use session::context::{Channel, QueryContextRef};
83use session::table_name::table_idents_to_full_name;
84use snafu::prelude::*;
85use sql::ast::ObjectNamePartExt;
86use sql::dialect::Dialect;
87use sql::parser::{ParseOptions, ParserContext};
88use sql::statements::comment::CommentObject;
89use sql::statements::copy::{CopyDatabase, CopyTable};
90use sql::statements::statement::Statement;
91use sql::statements::tql::Tql;
92use sqlparser::ast::ObjectName;
93pub use standalone::StandaloneDatanodeManager;
94use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
95use tracing::Span;
96
97use crate::error::{
98 self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
99 ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
100 StatementTimeoutSnafu, TableOperationSnafu,
101};
102use crate::stream_wrapper::CancellableStreamWrapper;
103
104lazy_static! {
105 static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
106}
107
108#[derive(Clone)]
112pub struct Instance {
113 catalog_manager: CatalogManagerRef,
114 pipeline_operator: Arc<PipelineOperator>,
115 statement_executor: Arc<StatementExecutor>,
116 query_engine: QueryEngineRef,
117 plugins: Plugins,
118 inserter: InserterRef,
119 deleter: DeleterRef,
120 table_metadata_manager: TableMetadataManagerRef,
121 event_recorder: Option<EventRecorderRef>,
122 process_manager: ProcessManagerRef,
123 slow_query_options: SlowQueryOptions,
124 suspend: Arc<AtomicBool>,
125
126 otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
131}
132
133impl Instance {
134 pub fn catalog_manager(&self) -> &CatalogManagerRef {
135 &self.catalog_manager
136 }
137
138 pub fn query_engine(&self) -> &QueryEngineRef {
139 &self.query_engine
140 }
141
142 pub fn plugins(&self) -> &Plugins {
143 &self.plugins
144 }
145
146 pub fn statement_executor(&self) -> &StatementExecutorRef {
147 &self.statement_executor
148 }
149
150 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
151 &self.table_metadata_manager
152 }
153
154 pub fn inserter(&self) -> &InserterRef {
155 &self.inserter
156 }
157
158 pub fn process_manager(&self) -> &ProcessManagerRef {
159 &self.process_manager
160 }
161
162 pub fn node_manager(&self) -> &NodeManagerRef {
163 self.inserter.node_manager()
164 }
165
166 pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
167 self.inserter.partition_manager()
168 }
169
170 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
171 self.statement_executor.cache_invalidator()
172 }
173
174 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
175 self.statement_executor.procedure_executor()
176 }
177
178 pub fn suspend_state(&self) -> Arc<AtomicBool> {
179 self.suspend.clone()
180 }
181
182 pub(crate) fn is_suspended(&self) -> bool {
183 self.suspend.load(atomic::Ordering::Relaxed)
184 }
185}
186
187fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
188 ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
189}
190
191impl Instance {
192 async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
193 check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
194
195 let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
196 let query_interceptor = query_interceptor.as_ref();
197
198 if stmt.is_readonly() {
199 let slow_query_timer = self
200 .slow_query_options
201 .enable
202 .then(|| self.event_recorder.clone())
203 .flatten()
204 .map(|event_recorder| {
205 SlowQueryTimer::new(
206 CatalogQueryStatement::Sql(stmt.clone()),
207 self.slow_query_options.threshold,
208 self.slow_query_options.sample_ratio,
209 self.slow_query_options.record_type,
210 event_recorder,
211 )
212 });
213
214 let ticket = self.process_manager.register_query(
215 query_ctx.current_catalog().to_string(),
216 vec![query_ctx.current_schema()],
217 stmt.to_string(),
218 query_ctx.conn_info().to_string(),
219 Some(query_ctx.process_id()),
220 slow_query_timer,
221 );
222
223 let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
224
225 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
226 .await
227 .map_err(|_| error::CancelledSnafu.build())?
228 .map(|output| {
229 let Output { meta, data } = output;
230
231 let data = match data {
232 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
233 CancellableStreamWrapper::new(stream, ticket),
234 )),
235 other => other,
236 };
237 Output { data, meta }
238 })
239 } else {
240 self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
241 .await
242 }
243 }
244
245 async fn exec_statement_with_timeout(
246 &self,
247 stmt: Statement,
248 query_ctx: QueryContextRef,
249 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
250 ) -> Result<Output> {
251 let timeout = derive_timeout(&stmt, &query_ctx);
252 match timeout {
253 Some(timeout) => {
254 let start = tokio::time::Instant::now();
255 let output = tokio::time::timeout(
256 timeout,
257 self.exec_statement(stmt, query_ctx, query_interceptor),
258 )
259 .await
260 .map_err(|_| StatementTimeoutSnafu.build())??;
261 let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
263 attach_timeout(output, remaining_timeout)
264 }
265 None => {
266 self.exec_statement(stmt, query_ctx, query_interceptor)
267 .await
268 }
269 }
270 }
271
272 async fn exec_statement(
273 &self,
274 stmt: Statement,
275 query_ctx: QueryContextRef,
276 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
277 ) -> Result<Output> {
278 match stmt {
279 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
280 if let Statement::Explain(explain) = &stmt
282 && let Some(format) = explain.format()
283 {
284 query_ctx.set_explain_format(format.to_string());
285 }
286
287 self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
288 .await
289 }
290 Statement::Tql(tql) => {
291 self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
292 .await
293 }
294 _ => {
295 query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
296 self.statement_executor
297 .execute_sql(stmt, query_ctx)
298 .await
299 .context(TableOperationSnafu)
300 }
301 }
302 }
303
304 async fn plan_and_exec_sql(
305 &self,
306 stmt: Statement,
307 query_ctx: &QueryContextRef,
308 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
309 ) -> Result<Output> {
310 let stmt = QueryStatement::Sql(stmt);
311 let plan = self
312 .statement_executor
313 .plan(&stmt, query_ctx.clone())
314 .await?;
315 let QueryStatement::Sql(stmt) = stmt else {
316 unreachable!()
317 };
318 query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
319
320 self.statement_executor
321 .exec_plan(plan, query_ctx.clone())
322 .await
323 .context(TableOperationSnafu)
324 }
325
326 async fn plan_and_exec_tql(
327 &self,
328 query_ctx: &QueryContextRef,
329 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
330 tql: Tql,
331 ) -> Result<Output> {
332 let plan = self
333 .statement_executor
334 .plan_tql(tql.clone(), query_ctx)
335 .await?;
336 query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
337 self.statement_executor
338 .exec_plan(plan, query_ctx.clone())
339 .await
340 .context(TableOperationSnafu)
341 }
342
343 async fn check_otlp_legacy(
344 &self,
345 names: &[&String],
346 ctx: QueryContextRef,
347 ) -> server_error::Result<bool> {
348 let db_string = ctx.get_db_string();
349 let cache = self
351 .otlp_metrics_table_legacy_cache
352 .entry(db_string.clone())
353 .or_default();
354 if let Some(flag) = fast_legacy_check(&cache, names)? {
355 return Ok(flag);
356 }
357 drop(cache);
359
360 let catalog = ctx.current_catalog();
361 let schema = ctx.current_schema();
362
363 let normalized_names = names
365 .iter()
366 .map(|n| legacy_normalize_otlp_name(n))
367 .collect::<Vec<_>>();
368 let table_names = normalized_names
369 .iter()
370 .map(|n| TableNameKey::new(catalog, &schema, n))
371 .collect::<Vec<_>>();
372 let table_values = self
373 .table_metadata_manager()
374 .table_name_manager()
375 .batch_get(table_names)
376 .await
377 .context(CommonMetaSnafu)?;
378 let table_ids = table_values
379 .into_iter()
380 .filter_map(|v| v.map(|vi| vi.table_id()))
381 .collect::<Vec<_>>();
382
383 if table_ids.is_empty() {
385 let cache = self
386 .otlp_metrics_table_legacy_cache
387 .entry(db_string)
388 .or_default();
389 names.iter().for_each(|name| {
390 cache.insert((*name).clone(), false);
391 });
392 return Ok(false);
393 }
394
395 let table_infos = self
397 .table_metadata_manager()
398 .table_info_manager()
399 .batch_get(&table_ids)
400 .await
401 .context(CommonMetaSnafu)?;
402 let options = table_infos
403 .values()
404 .map(|info| {
405 info.table_info
406 .meta
407 .options
408 .extra_options
409 .get(OTLP_METRIC_COMPAT_KEY)
410 .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
411 })
412 .collect::<Vec<_>>();
413 let cache = self
414 .otlp_metrics_table_legacy_cache
415 .entry(db_string)
416 .or_default();
417 if !options.is_empty() {
418 let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
420 let has_legacy = options
421 .iter()
422 .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
423 ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
424 let flag = has_legacy;
425 names.iter().for_each(|name| {
426 cache.insert((*name).clone(), flag);
427 });
428 Ok(flag)
429 } else {
430 names.iter().for_each(|name| {
432 cache.insert((*name).clone(), false);
433 });
434 Ok(false)
435 }
436 }
437}
438
439fn fast_legacy_check(
440 cache: &DashMap<String, bool>,
441 names: &[&String],
442) -> server_error::Result<Option<bool>> {
443 let hit_cache = names
444 .iter()
445 .filter_map(|name| cache.get(*name))
446 .collect::<Vec<_>>();
447 if !hit_cache.is_empty() {
448 let hit_legacy = hit_cache.iter().any(|en| *en.value());
449 let hit_prom = hit_cache.iter().any(|en| !*en.value());
450
451 ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
455
456 let flag = hit_legacy;
457 drop(hit_cache);
459
460 names.iter().for_each(|name| {
462 if !cache.contains_key(*name) {
463 cache.insert((*name).clone(), flag);
464 }
465 });
466 Ok(Some(flag))
467 } else {
468 Ok(None)
469 }
470}
471
472fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
475 let query_timeout = query_ctx.query_timeout()?;
476 if query_timeout.is_zero() {
477 return None;
478 }
479 match query_ctx.channel() {
480 Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
481 Channel::Postgres => Some(query_timeout),
482 _ => None,
483 }
484}
485
486fn derive_timeout_for_plan(plan: &LogicalPlan, query_ctx: &QueryContextRef) -> Option<Duration> {
488 let query_timeout = query_ctx.query_timeout()?;
489 if query_timeout.is_zero() {
490 return None;
491 }
492 match query_ctx.channel() {
493 Channel::Mysql if is_readonly_plan(plan) => Some(query_timeout),
494 Channel::Postgres => Some(query_timeout),
495 _ => None,
496 }
497}
498
499fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
500 if timeout.is_zero() {
501 return StatementTimeoutSnafu.fail();
502 }
503
504 let output = match output.data {
505 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
506 OutputData::Stream(mut stream) => {
507 let schema = stream.schema();
508 let s = Box::pin(stream! {
509 let mut start = tokio::time::Instant::now();
510 while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
511 yield item;
512
513 let now = tokio::time::Instant::now();
514 timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
515 start = now;
516 if timeout.is_zero() {
518 StreamTimeoutSnafu.fail()?;
519 }
520 }
521 }) as Pin<Box<dyn Stream<Item = _> + Send>>;
522 let stream = RecordBatchStreamWrapper {
523 schema,
524 stream: s,
525 output_ordering: None,
526 metrics: Default::default(),
527 span: Span::current(),
528 };
529 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
530 }
531 };
532
533 Ok(output)
534}
535
536impl Instance {
537 #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")]
538 async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
539 if self.is_suspended() {
540 return vec![error::SuspendedSnafu {}.fail()];
541 }
542
543 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
544 let query_interceptor = query_interceptor_opt.as_ref();
545 let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
546 Ok(q) => q,
547 Err(e) => return vec![Err(e)],
548 };
549
550 let checker_ref = self.plugins.get::<PermissionCheckerRef>();
551 let checker = checker_ref.as_ref();
552
553 match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
554 .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
555 {
556 Ok(stmts) => {
557 if stmts.is_empty() {
558 return vec![
559 InvalidSqlSnafu {
560 err_msg: "empty statements",
561 }
562 .fail(),
563 ];
564 }
565
566 let mut results = Vec::with_capacity(stmts.len());
567 for stmt in stmts {
568 if let Err(e) = checker
569 .check_permission(
570 query_ctx.current_user(),
571 PermissionReq::SqlStatement(&stmt),
572 )
573 .context(PermissionSnafu)
574 {
575 results.push(Err(e));
576 break;
577 }
578
579 match self.query_statement(stmt.clone(), query_ctx.clone()).await {
580 Ok(output) => {
581 let output_result =
582 query_interceptor.post_execute(output, query_ctx.clone());
583 results.push(output_result);
584 }
585 Err(e) => {
586 if e.status_code().should_log_error() {
587 error!(e; "Failed to execute query: {stmt}");
588 } else {
589 debug!("Failed to execute query: {stmt}, {e}");
590 }
591 results.push(Err(e));
592 break;
593 }
594 }
595 }
596 results
597 }
598 Err(e) => {
599 vec![Err(e)]
600 }
601 }
602 }
603
604 async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
605 self.query_engine
606 .execute(plan, query_ctx)
607 .await
608 .context(ExecLogicalPlanSnafu)
609 }
610
611 async fn exec_plan_with_timeout(
612 &self,
613 plan: LogicalPlan,
614 query_ctx: QueryContextRef,
615 ) -> Result<Output> {
616 let timeout = derive_timeout_for_plan(&plan, &query_ctx);
617 match timeout {
618 Some(timeout) => {
619 let start = tokio::time::Instant::now();
620 let output = tokio::time::timeout(timeout, self.exec_plan(plan, query_ctx))
621 .await
622 .map_err(|_| StatementTimeoutSnafu.build())??;
623 let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
624 attach_timeout(output, remaining_timeout)
625 }
626 None => self.exec_plan(plan, query_ctx).await,
627 }
628 }
629
630 async fn do_exec_plan_inner(
631 &self,
632 plan: LogicalPlan,
633 stmt: Option<Statement>,
634 query_ctx: QueryContextRef,
635 ) -> Result<Output> {
636 ensure!(!self.is_suspended(), error::SuspendedSnafu);
637
638 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
639 let query_interceptor = query_interceptor_opt.as_ref();
640
641 if let Some(ref s) = stmt {
642 query_interceptor.pre_execute(s, Some(&plan), query_ctx.clone())?;
643 }
644
645 let query = stmt
646 .as_ref()
647 .map(|s| s.to_string())
648 .unwrap_or_else(|| plan.display_indent().to_string());
649
650 let result = if is_readonly_plan(&plan) {
651 let slow_query_timer = self
652 .slow_query_options
653 .enable
654 .then(|| self.event_recorder.clone())
655 .flatten()
656 .map(|event_recorder| {
657 SlowQueryTimer::new(
658 CatalogQueryStatement::Plan(query.clone()),
659 self.slow_query_options.threshold,
660 self.slow_query_options.sample_ratio,
661 self.slow_query_options.record_type,
662 event_recorder,
663 )
664 });
665
666 let ticket = self.process_manager.register_query(
667 query_ctx.current_catalog().to_string(),
668 vec![query_ctx.current_schema()],
669 query,
670 query_ctx.conn_info().to_string(),
671 Some(query_ctx.process_id()),
672 slow_query_timer,
673 );
674
675 let query_fut = self.exec_plan_with_timeout(plan, query_ctx.clone());
676
677 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
678 .await
679 .map_err(|_| error::CancelledSnafu.build())?
680 .map(|output| {
681 let Output { meta, data } = output;
682
683 let data = match data {
684 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
685 CancellableStreamWrapper::new(stream, ticket),
686 )),
687 other => other,
688 };
689 Output { data, meta }
690 })
691 } else {
692 self.exec_plan_with_timeout(plan, query_ctx.clone()).await
693 };
694
695 result.and_then(|output| query_interceptor.post_execute(output, query_ctx))
696 }
697
698 #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")]
699 async fn do_promql_query_inner(
700 &self,
701 query: &PromQuery,
702 query_ctx: QueryContextRef,
703 ) -> Vec<Result<Output>> {
704 if self.is_suspended() {
705 return vec![error::SuspendedSnafu {}.fail()];
706 }
707
708 let result = PrometheusHandler::do_query(self, query, query_ctx)
710 .await
711 .with_context(|_| ExecutePromqlSnafu {
712 query: format!("{query:?}"),
713 });
714 vec![result]
715 }
716
717 async fn do_describe_inner(
718 &self,
719 stmt: Statement,
720 query_ctx: QueryContextRef,
721 ) -> Result<Option<DescribeResult>> {
722 ensure!(!self.is_suspended(), error::SuspendedSnafu);
723
724 let is_inner_plannable = |s: &Statement| {
728 matches!(
729 s,
730 Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
731 )
732 };
733 let plannable = is_inner_plannable(&stmt)
734 || matches!(&stmt, Statement::Explain(explain) if is_inner_plannable(explain.statement.as_ref()));
735
736 if plannable {
737 self.plugins
738 .get::<PermissionCheckerRef>()
739 .as_ref()
740 .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
741 .context(PermissionSnafu)?;
742
743 let plan = self
744 .query_engine
745 .planner()
746 .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
747 .await
748 .context(PlanStatementSnafu)?;
749 self.query_engine
750 .describe(plan, query_ctx)
751 .await
752 .map(Some)
753 .context(error::DescribeStatementSnafu)
754 } else {
755 Ok(None)
756 }
757 }
758
759 async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result<bool> {
760 self.catalog_manager
761 .schema_exists(catalog, schema, None)
762 .await
763 .context(error::CatalogSnafu)
764 }
765}
766
767#[async_trait]
768impl SqlQueryHandler for Instance {
769 async fn do_query(
770 &self,
771 query: &str,
772 query_ctx: QueryContextRef,
773 ) -> Vec<server_error::Result<Output>> {
774 self.do_query_inner(query, query_ctx)
775 .await
776 .into_iter()
777 .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
778 .collect()
779 }
780
781 async fn do_exec_plan(
782 &self,
783 plan: LogicalPlan,
784 stmt: Option<Statement>,
785 query_ctx: QueryContextRef,
786 ) -> server_error::Result<Output> {
787 self.do_exec_plan_inner(plan, stmt, query_ctx)
788 .await
789 .map_err(BoxedError::new)
790 .context(server_error::ExecutePlanSnafu)
791 }
792
793 async fn do_promql_query(
794 &self,
795 query: &PromQuery,
796 query_ctx: QueryContextRef,
797 ) -> Vec<server_error::Result<Output>> {
798 self.do_promql_query_inner(query, query_ctx)
799 .await
800 .into_iter()
801 .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
802 .collect()
803 }
804
805 async fn do_describe(
806 &self,
807 stmt: Statement,
808 query_ctx: QueryContextRef,
809 ) -> server_error::Result<Option<DescribeResult>> {
810 self.do_describe_inner(stmt, query_ctx)
811 .await
812 .map_err(BoxedError::new)
813 .context(server_error::DescribeStatementSnafu)
814 }
815
816 async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
817 self.is_valid_schema_inner(catalog, schema)
818 .await
819 .map_err(BoxedError::new)
820 .context(server_error::CheckDatabaseValiditySnafu)
821 }
822}
823
824pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
826 match output.data {
827 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
828 OutputData::Stream(stream) => {
829 let stream = OnDone::new(stream, move || {
830 timer.observe_duration();
831 });
832 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
833 }
834 }
835}
836
837#[async_trait]
838impl PrometheusHandler for Instance {
839 #[tracing::instrument(skip_all)]
840 async fn do_query(
841 &self,
842 query: &PromQuery,
843 query_ctx: QueryContextRef,
844 ) -> server_error::Result<Output> {
845 let interceptor = self
846 .plugins
847 .get::<PromQueryInterceptorRef<server_error::Error>>();
848
849 self.plugins
850 .get::<PermissionCheckerRef>()
851 .as_ref()
852 .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
853 .context(AuthSnafu)?;
854
855 let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
856 ParsePromQLSnafu {
857 query: query.clone(),
858 }
859 })?;
860
861 let plan = self
862 .statement_executor
863 .plan(&stmt, query_ctx.clone())
864 .await
865 .map_err(BoxedError::new)
866 .context(ExecuteQuerySnafu)?;
867
868 interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
869
870 let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
872 CatalogQueryStatement::Promql(eval_stmt, alias)
873 } else {
874 return UnexpectedResultSnafu {
876 reason: "The query should always be promql.".to_string(),
877 }
878 .fail();
879 };
880 let query = query_statement.to_string();
881
882 let slow_query_timer = self
883 .slow_query_options
884 .enable
885 .then(|| self.event_recorder.clone())
886 .flatten()
887 .map(|event_recorder| {
888 SlowQueryTimer::new(
889 query_statement,
890 self.slow_query_options.threshold,
891 self.slow_query_options.sample_ratio,
892 self.slow_query_options.record_type,
893 event_recorder,
894 )
895 });
896
897 let ticket = self.process_manager.register_query(
898 query_ctx.current_catalog().to_string(),
899 vec![query_ctx.current_schema()],
900 query,
901 query_ctx.conn_info().to_string(),
902 Some(query_ctx.process_id()),
903 slow_query_timer,
904 );
905
906 let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
907
908 let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
909 .await
910 .map_err(|_| servers::error::CancelledSnafu.build())?
911 .map(|output| {
912 let Output { meta, data } = output;
913 let data = match data {
914 OutputData::Stream(stream) => {
915 OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
916 }
917 other => other,
918 };
919 Output { data, meta }
920 })
921 .map_err(BoxedError::new)
922 .context(ExecuteQuerySnafu)?;
923
924 Ok(interceptor.post_execute(output, query_ctx)?)
925 }
926
927 async fn query_metric_names(
928 &self,
929 matchers: Vec<Matcher>,
930 ctx: &QueryContextRef,
931 ) -> server_error::Result<Vec<String>> {
932 self.handle_query_metric_names(matchers, ctx)
933 .await
934 .map_err(BoxedError::new)
935 .context(ExecuteQuerySnafu)
936 }
937
938 async fn query_label_values(
939 &self,
940 metric: String,
941 label_name: String,
942 matchers: Vec<Matcher>,
943 start: SystemTime,
944 end: SystemTime,
945 ctx: &QueryContextRef,
946 ) -> server_error::Result<Vec<String>> {
947 self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
948 .await
949 .map_err(BoxedError::new)
950 .context(ExecuteQuerySnafu)
951 }
952
953 fn catalog_manager(&self) -> CatalogManagerRef {
954 self.catalog_manager.clone()
955 }
956}
957
958macro_rules! validate_db_permission {
960 ($stmt: expr, $query_ctx: expr) => {
961 if let Some(database) = &$stmt.database {
962 validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
963 .map_err(BoxedError::new)
964 .context(SqlExecInterceptedSnafu)?;
965 }
966 };
967}
968
969pub fn check_permission(
970 plugins: Plugins,
971 stmt: &Statement,
972 query_ctx: &QueryContextRef,
973) -> Result<()> {
974 let need_validate = plugins
975 .get::<QueryOptions>()
976 .map(|opts| opts.disallow_cross_catalog_query)
977 .unwrap_or_default();
978
979 if !need_validate {
980 return Ok(());
981 }
982
983 match stmt {
984 Statement::Admin(_) => {}
987 Statement::Query(_)
989 | Statement::Explain(_)
990 | Statement::Tql(_)
991 | Statement::Delete(_)
992 | Statement::DeclareCursor(_)
993 | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
994 Statement::CreateDatabase(_)
996 | Statement::ShowDatabases(_)
997 | Statement::DropDatabase(_)
998 | Statement::AlterDatabase(_)
999 | Statement::DropFlow(_)
1000 | Statement::Use(_) => {}
1001 #[cfg(feature = "enterprise")]
1002 Statement::DropTrigger(_) => {}
1003 Statement::ShowCreateDatabase(stmt) => {
1004 validate_database(&stmt.database_name, query_ctx)?;
1005 }
1006 Statement::ShowCreateTable(stmt) => {
1007 validate_param(&stmt.table_name, query_ctx)?;
1008 }
1009 Statement::ShowCreateFlow(stmt) => {
1010 validate_flow(&stmt.flow_name, query_ctx)?;
1011 }
1012 #[cfg(feature = "enterprise")]
1013 Statement::ShowCreateTrigger(stmt) => {
1014 validate_param(&stmt.trigger_name, query_ctx)?;
1015 }
1016 Statement::ShowCreateView(stmt) => {
1017 validate_param(&stmt.view_name, query_ctx)?;
1018 }
1019 Statement::CreateExternalTable(stmt) => {
1020 validate_param(&stmt.name, query_ctx)?;
1021 }
1022 Statement::CreateFlow(stmt) => {
1023 validate_param(&stmt.sink_table_name, query_ctx)?;
1025 }
1026 #[cfg(feature = "enterprise")]
1027 Statement::CreateTrigger(stmt) => {
1028 validate_param(&stmt.trigger_name, query_ctx)?;
1029 }
1030 Statement::CreateView(stmt) => {
1031 validate_param(&stmt.name, query_ctx)?;
1032 }
1033 Statement::AlterTable(stmt) => {
1034 validate_param(stmt.table_name(), query_ctx)?;
1035 }
1036 #[cfg(feature = "enterprise")]
1037 Statement::AlterTrigger(_) => {}
1038 Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
1040 Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
1042
1043 Statement::Comment(comment) => match &comment.object {
1044 CommentObject::Table(table) => validate_param(table, query_ctx)?,
1045 CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
1046 CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
1047 },
1048
1049 Statement::Insert(insert) => {
1050 let name = insert.table_name().context(ParseSqlSnafu)?;
1051 validate_param(name, query_ctx)?;
1052 }
1053 Statement::CreateTable(stmt) => {
1054 validate_param(&stmt.name, query_ctx)?;
1055 }
1056 Statement::CreateTableLike(stmt) => {
1057 validate_param(&stmt.table_name, query_ctx)?;
1058 validate_param(&stmt.source_name, query_ctx)?;
1059 }
1060 Statement::DropTable(drop_stmt) => {
1061 for table_name in drop_stmt.table_names() {
1062 validate_param(table_name, query_ctx)?;
1063 }
1064 }
1065 Statement::DropView(stmt) => {
1066 validate_param(&stmt.view_name, query_ctx)?;
1067 }
1068 Statement::ShowTables(stmt) => {
1069 validate_db_permission!(stmt, query_ctx);
1070 }
1071 Statement::ShowTableStatus(stmt) => {
1072 validate_db_permission!(stmt, query_ctx);
1073 }
1074 Statement::ShowColumns(stmt) => {
1075 validate_db_permission!(stmt, query_ctx);
1076 }
1077 Statement::ShowIndex(stmt) => {
1078 validate_db_permission!(stmt, query_ctx);
1079 }
1080 Statement::ShowRegion(stmt) => {
1081 validate_db_permission!(stmt, query_ctx);
1082 }
1083 Statement::ShowViews(stmt) => {
1084 validate_db_permission!(stmt, query_ctx);
1085 }
1086 Statement::ShowFlows(stmt) => {
1087 validate_db_permission!(stmt, query_ctx);
1088 }
1089 #[cfg(feature = "enterprise")]
1090 Statement::ShowTriggers(_stmt) => {
1091 }
1094 Statement::ShowStatus(_stmt) => {}
1095 Statement::ShowSearchPath(_stmt) => {}
1096 Statement::DescribeTable(stmt) => {
1097 validate_param(stmt.name(), query_ctx)?;
1098 }
1099 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1100 CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?,
1101 CopyTable::From(copy_table_from) => {
1102 validate_param(©_table_from.table_name, query_ctx)?
1103 }
1104 },
1105 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1106 match copy_database {
1107 CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1108 CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1109 }
1110 }
1111 Statement::TruncateTable(stmt) => {
1112 validate_param(stmt.table_name(), query_ctx)?;
1113 }
1114 Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1116 Statement::Kill(_) => {}
1118 Statement::ShowProcesslist(_) => {}
1120 }
1121 Ok(())
1122}
1123
1124fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1125 let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1126 .map_err(BoxedError::new)
1127 .context(ExternalSnafu)?;
1128
1129 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1130 .map_err(BoxedError::new)
1131 .context(SqlExecInterceptedSnafu)
1132}
1133
1134fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1135 let catalog = match &name.0[..] {
1136 [_flow] => query_ctx.current_catalog().to_string(),
1137 [catalog, _flow] => catalog.to_string_unquoted(),
1138 _ => {
1139 return InvalidSqlSnafu {
1140 err_msg: format!(
1141 "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1142 ),
1143 }
1144 .fail();
1145 }
1146 };
1147
1148 let schema = query_ctx.current_schema();
1149
1150 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1151 .map_err(BoxedError::new)
1152 .context(SqlExecInterceptedSnafu)
1153}
1154
1155fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1156 let (catalog, schema) = match &name.0[..] {
1157 [schema] => (
1158 query_ctx.current_catalog().to_string(),
1159 schema.to_string_unquoted(),
1160 ),
1161 [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1162 _ => InvalidSqlSnafu {
1163 err_msg: format!(
1164 "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1165 ),
1166 }
1167 .fail()?,
1168 };
1169
1170 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1171 .map_err(BoxedError::new)
1172 .context(SqlExecInterceptedSnafu)
1173}
1174
1175fn is_readonly_plan(plan: &LogicalPlan) -> bool {
1176 !matches!(plan, LogicalPlan::Dml(_) | LogicalPlan::Ddl(_))
1177}
1178
1179#[cfg(test)]
1180mod tests {
1181 use std::collections::HashMap;
1182 use std::sync::atomic::{AtomicBool, Ordering};
1183 use std::sync::{Arc, Barrier};
1184 use std::thread;
1185 use std::time::{Duration, Instant};
1186
1187 use common_base::Plugins;
1188 use query::query_engine::options::QueryOptions;
1189 use session::context::QueryContext;
1190 use sql::dialect::GreptimeDbDialect;
1191 use strfmt::Format;
1192
1193 use super::*;
1194
1195 #[test]
1196 fn test_fast_legacy_check_deadlock_prevention() {
1197 let cache = DashMap::new();
1199
1200 cache.insert("metric1".to_string(), true); cache.insert("metric2".to_string(), false); cache.insert("metric3".to_string(), true); let metric1 = "metric1".to_string();
1207 let metric4 = "metric4".to_string();
1208 let names1 = vec![&metric1, &metric4];
1209 let result = fast_legacy_check(&cache, &names1);
1210 assert!(result.is_ok());
1211 assert_eq!(result.unwrap(), Some(true)); assert!(cache.contains_key("metric4"));
1215 assert!(*cache.get("metric4").unwrap().value());
1216
1217 let metric5 = "metric5".to_string();
1219 let metric6 = "metric6".to_string();
1220 let names2 = vec![&metric5, &metric6];
1221 let result = fast_legacy_check(&cache, &names2);
1222 assert!(result.is_ok());
1223 assert_eq!(result.unwrap(), None); let cache_incompatible = DashMap::new();
1227 cache_incompatible.insert("metric1".to_string(), true); cache_incompatible.insert("metric2".to_string(), false); let metric1_test = "metric1".to_string();
1230 let metric2_test = "metric2".to_string();
1231 let names3 = vec![&metric1_test, &metric2_test];
1232 let result = fast_legacy_check(&cache_incompatible, &names3);
1233 assert!(result.is_err()); let cache_concurrent = Arc::new(DashMap::new());
1239 cache_concurrent.insert("shared_metric".to_string(), true);
1240
1241 let num_threads = 8;
1242 let operations_per_thread = 100;
1243 let barrier = Arc::new(Barrier::new(num_threads));
1244 let success_flag = Arc::new(AtomicBool::new(true));
1245
1246 let handles: Vec<_> = (0..num_threads)
1247 .map(|thread_id| {
1248 let cache_clone = Arc::clone(&cache_concurrent);
1249 let barrier_clone = Arc::clone(&barrier);
1250 let success_flag_clone = Arc::clone(&success_flag);
1251
1252 thread::spawn(move || {
1253 barrier_clone.wait();
1255
1256 let start_time = Instant::now();
1257 for i in 0..operations_per_thread {
1258 let shared_metric = "shared_metric".to_string();
1260 let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1261 let names = vec![&shared_metric, &new_metric];
1262
1263 match fast_legacy_check(&cache_clone, &names) {
1264 Ok(_) => {}
1265 Err(_) => {
1266 success_flag_clone.store(false, Ordering::Relaxed);
1267 return;
1268 }
1269 }
1270
1271 if start_time.elapsed() > Duration::from_secs(10) {
1273 success_flag_clone.store(false, Ordering::Relaxed);
1274 return;
1275 }
1276 }
1277 })
1278 })
1279 .collect();
1280
1281 let start_time = Instant::now();
1283 for (i, handle) in handles.into_iter().enumerate() {
1284 let join_result = handle.join();
1285
1286 if start_time.elapsed() > Duration::from_secs(30) {
1288 panic!("Test timed out - possible deadlock detected!");
1289 }
1290
1291 if join_result.is_err() {
1292 panic!("Thread {} panicked during execution", i);
1293 }
1294 }
1295
1296 assert!(
1298 success_flag.load(Ordering::Relaxed),
1299 "Some operations failed"
1300 );
1301
1302 let final_count = cache_concurrent.len();
1304 assert!(
1305 final_count > 1 + num_threads * operations_per_thread / 2,
1306 "Expected more cache entries, got {}",
1307 final_count
1308 );
1309 }
1310
1311 #[test]
1312 fn test_exec_validation() {
1313 let query_ctx = QueryContext::arc();
1314 let plugins: Plugins = Plugins::new();
1315 plugins.insert(QueryOptions {
1316 disallow_cross_catalog_query: true,
1317 });
1318
1319 let sql = r#"
1320 SELECT * FROM demo;
1321 EXPLAIN SELECT * FROM demo;
1322 CREATE DATABASE test_database;
1323 SHOW DATABASES;
1324 "#;
1325 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1326 assert_eq!(stmts.len(), 4);
1327 for stmt in stmts {
1328 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1329 re.unwrap();
1330 }
1331
1332 let sql = r#"
1333 SHOW CREATE TABLE demo;
1334 ALTER TABLE demo ADD COLUMN new_col INT;
1335 "#;
1336 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1337 assert_eq!(stmts.len(), 2);
1338 for stmt in stmts {
1339 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1340 re.unwrap();
1341 }
1342
1343 fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1344 let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1346 for (catalog, schema) in right {
1347 let sql = do_fmt(template_sql, catalog, schema);
1348 do_test(&sql, plugins.clone(), query_ctx, true);
1349 }
1350
1351 let wrong = vec![
1352 ("wrongcatalog.", "public."),
1353 ("wrongcatalog.", "wrongschema."),
1354 ];
1355 for (catalog, schema) in wrong {
1356 let sql = do_fmt(template_sql, catalog, schema);
1357 do_test(&sql, plugins.clone(), query_ctx, false);
1358 }
1359 }
1360
1361 fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1362 let vars = HashMap::from([
1363 ("catalog".to_string(), catalog),
1364 ("schema".to_string(), schema),
1365 ]);
1366 template.format(&vars).unwrap()
1367 }
1368
1369 fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1370 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1371 let re = check_permission(plugins, stmt, query_ctx);
1372 if is_ok {
1373 re.unwrap();
1374 } else {
1375 assert!(re.is_err());
1376 }
1377 }
1378
1379 let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1381 replace_test(sql, plugins.clone(), &query_ctx);
1382
1383 let sql = r#"CREATE TABLE {catalog}{schema}demo(
1385 host STRING,
1386 ts TIMESTAMP,
1387 TIME INDEX (ts),
1388 PRIMARY KEY(host)
1389 ) engine=mito;"#;
1390 replace_test(sql, plugins.clone(), &query_ctx);
1391
1392 let sql = "DROP TABLE {catalog}{schema}demo;";
1394 replace_test(sql, plugins.clone(), &query_ctx);
1395
1396 let sql = "SHOW TABLES FROM public";
1398 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1399 check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1400
1401 let sql = "SHOW TABLES FROM private";
1402 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1403 let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1404 assert!(re.is_ok());
1405
1406 let sql = "DESC TABLE {catalog}{schema}demo;";
1408 replace_test(sql, plugins.clone(), &query_ctx);
1409
1410 let comment_flow_cases = [
1411 ("COMMENT ON FLOW my_flow IS 'comment';", true),
1412 ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
1413 ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
1414 ];
1415 for (sql, is_ok) in comment_flow_cases {
1416 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1417 let result = check_permission(plugins.clone(), stmt, &query_ctx);
1418 assert_eq!(result.is_ok(), is_ok);
1419 }
1420
1421 let show_flow_cases = [
1422 ("SHOW CREATE FLOW my_flow;", true),
1423 ("SHOW CREATE FLOW greptime.my_flow;", true),
1424 ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
1425 ];
1426 for (sql, is_ok) in show_flow_cases {
1427 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1428 let result = check_permission(plugins.clone(), stmt, &query_ctx);
1429 assert_eq!(result.is_ok(), is_ok);
1430 }
1431 }
1432}