1pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::pin::Pin;
29use std::sync::atomic::AtomicBool;
30use std::sync::{Arc, atomic};
31use std::time::{Duration, SystemTime};
32
33use async_stream::stream;
34use async_trait::async_trait;
35use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
36use catalog::CatalogManagerRef;
37use catalog::process_manager::{
38 ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
39};
40use client::OutputData;
41use common_base::Plugins;
42use common_base::cancellation::CancellableFuture;
43use common_error::ext::{BoxedError, ErrorExt};
44use common_event_recorder::EventRecorderRef;
45use common_meta::cache_invalidator::CacheInvalidatorRef;
46use common_meta::key::TableMetadataManagerRef;
47use common_meta::key::table_name::TableNameKey;
48use common_meta::node_manager::NodeManagerRef;
49use common_meta::procedure_executor::ProcedureExecutorRef;
50use common_query::Output;
51use common_recordbatch::RecordBatchStreamWrapper;
52use common_recordbatch::error::StreamTimeoutSnafu;
53use common_telemetry::logging::SlowQueryOptions;
54use common_telemetry::{debug, error, tracing};
55use dashmap::DashMap;
56use datafusion_expr::LogicalPlan;
57use futures::{Stream, StreamExt};
58use lazy_static::lazy_static;
59use operator::delete::DeleterRef;
60use operator::insert::InserterRef;
61use operator::statement::{StatementExecutor, StatementExecutorRef};
62use partition::manager::PartitionRuleManagerRef;
63use pipeline::pipeline_operator::PipelineOperator;
64use prometheus::HistogramTimer;
65use promql_parser::label::Matcher;
66use query::QueryEngineRef;
67use query::metrics::OnDone;
68use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
69use query::query_engine::DescribeResult;
70use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
71use servers::error::{
72 self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
73 OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
74};
75use servers::interceptor::{
76 PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
77};
78use servers::otlp::metrics::legacy_normalize_otlp_name;
79use servers::prometheus_handler::PrometheusHandler;
80use servers::query_handler::sql::SqlQueryHandler;
81use session::context::{Channel, QueryContextRef};
82use session::table_name::table_idents_to_full_name;
83use snafu::prelude::*;
84use sql::ast::ObjectNamePartExt;
85use sql::dialect::Dialect;
86use sql::parser::{ParseOptions, ParserContext};
87use sql::statements::comment::CommentObject;
88use sql::statements::copy::{CopyDatabase, CopyTable};
89use sql::statements::statement::Statement;
90use sql::statements::tql::Tql;
91use sqlparser::ast::ObjectName;
92pub use standalone::StandaloneDatanodeManager;
93use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
94use tracing::Span;
95
96use crate::error::{
97 self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
98 ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
99 StatementTimeoutSnafu, TableOperationSnafu,
100};
101use crate::stream_wrapper::CancellableStreamWrapper;
102
103lazy_static! {
104 static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
105}
106
107#[derive(Clone)]
111pub struct Instance {
112 catalog_manager: CatalogManagerRef,
113 pipeline_operator: Arc<PipelineOperator>,
114 statement_executor: Arc<StatementExecutor>,
115 query_engine: QueryEngineRef,
116 plugins: Plugins,
117 inserter: InserterRef,
118 deleter: DeleterRef,
119 table_metadata_manager: TableMetadataManagerRef,
120 event_recorder: Option<EventRecorderRef>,
121 process_manager: ProcessManagerRef,
122 slow_query_options: SlowQueryOptions,
123 suspend: Arc<AtomicBool>,
124
125 otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
130}
131
132impl Instance {
133 pub fn catalog_manager(&self) -> &CatalogManagerRef {
134 &self.catalog_manager
135 }
136
137 pub fn query_engine(&self) -> &QueryEngineRef {
138 &self.query_engine
139 }
140
141 pub fn plugins(&self) -> &Plugins {
142 &self.plugins
143 }
144
145 pub fn statement_executor(&self) -> &StatementExecutorRef {
146 &self.statement_executor
147 }
148
149 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
150 &self.table_metadata_manager
151 }
152
153 pub fn inserter(&self) -> &InserterRef {
154 &self.inserter
155 }
156
157 pub fn process_manager(&self) -> &ProcessManagerRef {
158 &self.process_manager
159 }
160
161 pub fn node_manager(&self) -> &NodeManagerRef {
162 self.inserter.node_manager()
163 }
164
165 pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
166 self.inserter.partition_manager()
167 }
168
169 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
170 self.statement_executor.cache_invalidator()
171 }
172
173 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
174 self.statement_executor.procedure_executor()
175 }
176
177 pub fn suspend_state(&self) -> Arc<AtomicBool> {
178 self.suspend.clone()
179 }
180
181 pub(crate) fn is_suspended(&self) -> bool {
182 self.suspend.load(atomic::Ordering::Relaxed)
183 }
184}
185
186fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
187 ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
188}
189
190impl Instance {
191 async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
192 check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
193
194 let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
195 let query_interceptor = query_interceptor.as_ref();
196
197 if should_capture_statement(Some(&stmt)) {
198 let slow_query_timer = self
199 .slow_query_options
200 .enable
201 .then(|| self.event_recorder.clone())
202 .flatten()
203 .map(|event_recorder| {
204 SlowQueryTimer::new(
205 CatalogQueryStatement::Sql(stmt.clone()),
206 self.slow_query_options.threshold,
207 self.slow_query_options.sample_ratio,
208 self.slow_query_options.record_type,
209 event_recorder,
210 )
211 });
212
213 let ticket = self.process_manager.register_query(
214 query_ctx.current_catalog().to_string(),
215 vec![query_ctx.current_schema()],
216 stmt.to_string(),
217 query_ctx.conn_info().to_string(),
218 Some(query_ctx.process_id()),
219 slow_query_timer,
220 );
221
222 let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
223
224 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
225 .await
226 .map_err(|_| error::CancelledSnafu.build())?
227 .map(|output| {
228 let Output { meta, data } = output;
229
230 let data = match data {
231 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
232 CancellableStreamWrapper::new(stream, ticket),
233 )),
234 other => other,
235 };
236 Output { data, meta }
237 })
238 } else {
239 self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
240 .await
241 }
242 }
243
244 async fn exec_statement_with_timeout(
245 &self,
246 stmt: Statement,
247 query_ctx: QueryContextRef,
248 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
249 ) -> Result<Output> {
250 let timeout = derive_timeout(&stmt, &query_ctx);
251 match timeout {
252 Some(timeout) => {
253 let start = tokio::time::Instant::now();
254 let output = tokio::time::timeout(
255 timeout,
256 self.exec_statement(stmt, query_ctx, query_interceptor),
257 )
258 .await
259 .map_err(|_| StatementTimeoutSnafu.build())??;
260 let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
262 attach_timeout(output, remaining_timeout)
263 }
264 None => {
265 self.exec_statement(stmt, query_ctx, query_interceptor)
266 .await
267 }
268 }
269 }
270
271 async fn exec_statement(
272 &self,
273 stmt: Statement,
274 query_ctx: QueryContextRef,
275 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
276 ) -> Result<Output> {
277 match stmt {
278 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
279 if let Statement::Explain(explain) = &stmt
281 && let Some(format) = explain.format()
282 {
283 query_ctx.set_explain_format(format.to_string());
284 }
285
286 self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
287 .await
288 }
289 Statement::Tql(tql) => {
290 self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
291 .await
292 }
293 _ => {
294 query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
295 self.statement_executor
296 .execute_sql(stmt, query_ctx)
297 .await
298 .context(TableOperationSnafu)
299 }
300 }
301 }
302
303 async fn plan_and_exec_sql(
304 &self,
305 stmt: Statement,
306 query_ctx: &QueryContextRef,
307 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
308 ) -> Result<Output> {
309 let stmt = QueryStatement::Sql(stmt);
310 let plan = self
311 .statement_executor
312 .plan(&stmt, query_ctx.clone())
313 .await?;
314 let QueryStatement::Sql(stmt) = stmt else {
315 unreachable!()
316 };
317 query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
318
319 self.statement_executor
320 .exec_plan(plan, query_ctx.clone())
321 .await
322 .context(TableOperationSnafu)
323 }
324
325 async fn plan_and_exec_tql(
326 &self,
327 query_ctx: &QueryContextRef,
328 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
329 tql: Tql,
330 ) -> Result<Output> {
331 let plan = self
332 .statement_executor
333 .plan_tql(tql.clone(), query_ctx)
334 .await?;
335 query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
336 self.statement_executor
337 .exec_plan(plan, query_ctx.clone())
338 .await
339 .context(TableOperationSnafu)
340 }
341
342 async fn check_otlp_legacy(
343 &self,
344 names: &[&String],
345 ctx: QueryContextRef,
346 ) -> server_error::Result<bool> {
347 let db_string = ctx.get_db_string();
348 let cache = self
350 .otlp_metrics_table_legacy_cache
351 .entry(db_string.clone())
352 .or_default();
353 if let Some(flag) = fast_legacy_check(&cache, names)? {
354 return Ok(flag);
355 }
356 drop(cache);
358
359 let catalog = ctx.current_catalog();
360 let schema = ctx.current_schema();
361
362 let normalized_names = names
364 .iter()
365 .map(|n| legacy_normalize_otlp_name(n))
366 .collect::<Vec<_>>();
367 let table_names = normalized_names
368 .iter()
369 .map(|n| TableNameKey::new(catalog, &schema, n))
370 .collect::<Vec<_>>();
371 let table_values = self
372 .table_metadata_manager()
373 .table_name_manager()
374 .batch_get(table_names)
375 .await
376 .context(CommonMetaSnafu)?;
377 let table_ids = table_values
378 .into_iter()
379 .filter_map(|v| v.map(|vi| vi.table_id()))
380 .collect::<Vec<_>>();
381
382 if table_ids.is_empty() {
384 let cache = self
385 .otlp_metrics_table_legacy_cache
386 .entry(db_string)
387 .or_default();
388 names.iter().for_each(|name| {
389 cache.insert((*name).clone(), false);
390 });
391 return Ok(false);
392 }
393
394 let table_infos = self
396 .table_metadata_manager()
397 .table_info_manager()
398 .batch_get(&table_ids)
399 .await
400 .context(CommonMetaSnafu)?;
401 let options = table_infos
402 .values()
403 .map(|info| {
404 info.table_info
405 .meta
406 .options
407 .extra_options
408 .get(OTLP_METRIC_COMPAT_KEY)
409 .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
410 })
411 .collect::<Vec<_>>();
412 let cache = self
413 .otlp_metrics_table_legacy_cache
414 .entry(db_string)
415 .or_default();
416 if !options.is_empty() {
417 let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
419 let has_legacy = options
420 .iter()
421 .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
422 ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
423 let flag = has_legacy;
424 names.iter().for_each(|name| {
425 cache.insert((*name).clone(), flag);
426 });
427 Ok(flag)
428 } else {
429 names.iter().for_each(|name| {
431 cache.insert((*name).clone(), false);
432 });
433 Ok(false)
434 }
435 }
436}
437
438fn fast_legacy_check(
439 cache: &DashMap<String, bool>,
440 names: &[&String],
441) -> server_error::Result<Option<bool>> {
442 let hit_cache = names
443 .iter()
444 .filter_map(|name| cache.get(*name))
445 .collect::<Vec<_>>();
446 if !hit_cache.is_empty() {
447 let hit_legacy = hit_cache.iter().any(|en| *en.value());
448 let hit_prom = hit_cache.iter().any(|en| !*en.value());
449
450 ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
454
455 let flag = hit_legacy;
456 drop(hit_cache);
458
459 names.iter().for_each(|name| {
461 if !cache.contains_key(*name) {
462 cache.insert((*name).clone(), flag);
463 }
464 });
465 Ok(Some(flag))
466 } else {
467 Ok(None)
468 }
469}
470
471fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
474 let query_timeout = query_ctx.query_timeout()?;
475 if query_timeout.is_zero() {
476 return None;
477 }
478 match query_ctx.channel() {
479 Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
480 Channel::Postgres => Some(query_timeout),
481 _ => None,
482 }
483}
484
485fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
486 if timeout.is_zero() {
487 return StatementTimeoutSnafu.fail();
488 }
489
490 let output = match output.data {
491 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
492 OutputData::Stream(mut stream) => {
493 let schema = stream.schema();
494 let s = Box::pin(stream! {
495 let mut start = tokio::time::Instant::now();
496 while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
497 yield item;
498
499 let now = tokio::time::Instant::now();
500 timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
501 start = now;
502 if timeout.is_zero() {
504 StreamTimeoutSnafu.fail()?;
505 }
506 }
507 }) as Pin<Box<dyn Stream<Item = _> + Send>>;
508 let stream = RecordBatchStreamWrapper {
509 schema,
510 stream: s,
511 output_ordering: None,
512 metrics: Default::default(),
513 span: Span::current(),
514 };
515 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
516 }
517 };
518
519 Ok(output)
520}
521
522impl Instance {
523 #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")]
524 async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
525 if self.is_suspended() {
526 return vec![error::SuspendedSnafu {}.fail()];
527 }
528
529 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
530 let query_interceptor = query_interceptor_opt.as_ref();
531 let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
532 Ok(q) => q,
533 Err(e) => return vec![Err(e)],
534 };
535
536 let checker_ref = self.plugins.get::<PermissionCheckerRef>();
537 let checker = checker_ref.as_ref();
538
539 match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
540 .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
541 {
542 Ok(stmts) => {
543 if stmts.is_empty() {
544 return vec![
545 InvalidSqlSnafu {
546 err_msg: "empty statements",
547 }
548 .fail(),
549 ];
550 }
551
552 let mut results = Vec::with_capacity(stmts.len());
553 for stmt in stmts {
554 if let Err(e) = checker
555 .check_permission(
556 query_ctx.current_user(),
557 PermissionReq::SqlStatement(&stmt),
558 )
559 .context(PermissionSnafu)
560 {
561 results.push(Err(e));
562 break;
563 }
564
565 match self.query_statement(stmt.clone(), query_ctx.clone()).await {
566 Ok(output) => {
567 let output_result =
568 query_interceptor.post_execute(output, query_ctx.clone());
569 results.push(output_result);
570 }
571 Err(e) => {
572 if e.status_code().should_log_error() {
573 error!(e; "Failed to execute query: {stmt}");
574 } else {
575 debug!("Failed to execute query: {stmt}, {e}");
576 }
577 results.push(Err(e));
578 break;
579 }
580 }
581 }
582 results
583 }
584 Err(e) => {
585 vec![Err(e)]
586 }
587 }
588 }
589
590 async fn do_exec_plan_inner(
591 &self,
592 stmt: Option<Statement>,
593 plan: LogicalPlan,
594 query_ctx: QueryContextRef,
595 ) -> Result<Output> {
596 ensure!(!self.is_suspended(), error::SuspendedSnafu);
597
598 if should_capture_statement(stmt.as_ref()) {
599 let stmt = stmt.unwrap();
601 let query = stmt.to_string();
602 let slow_query_timer = self
603 .slow_query_options
604 .enable
605 .then(|| self.event_recorder.clone())
606 .flatten()
607 .map(|event_recorder| {
608 SlowQueryTimer::new(
609 CatalogQueryStatement::Sql(stmt.clone()),
610 self.slow_query_options.threshold,
611 self.slow_query_options.sample_ratio,
612 self.slow_query_options.record_type,
613 event_recorder,
614 )
615 });
616
617 let ticket = self.process_manager.register_query(
618 query_ctx.current_catalog().to_string(),
619 vec![query_ctx.current_schema()],
620 query,
621 query_ctx.conn_info().to_string(),
622 Some(query_ctx.process_id()),
623 slow_query_timer,
624 );
625
626 let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
627
628 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
629 .await
630 .map_err(|_| error::CancelledSnafu.build())?
631 .map(|output| {
632 let Output { meta, data } = output;
633
634 let data = match data {
635 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
636 CancellableStreamWrapper::new(stream, ticket),
637 )),
638 other => other,
639 };
640 Output { data, meta }
641 })
642 .context(ExecLogicalPlanSnafu)
643 } else {
644 self.query_engine
647 .execute(plan.clone(), query_ctx)
648 .await
649 .context(ExecLogicalPlanSnafu)
650 }
651 }
652
653 #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")]
654 async fn do_promql_query_inner(
655 &self,
656 query: &PromQuery,
657 query_ctx: QueryContextRef,
658 ) -> Vec<Result<Output>> {
659 if self.is_suspended() {
660 return vec![error::SuspendedSnafu {}.fail()];
661 }
662
663 let result = PrometheusHandler::do_query(self, query, query_ctx)
665 .await
666 .with_context(|_| ExecutePromqlSnafu {
667 query: format!("{query:?}"),
668 });
669 vec![result]
670 }
671
672 async fn do_describe_inner(
673 &self,
674 stmt: Statement,
675 query_ctx: QueryContextRef,
676 ) -> Result<Option<DescribeResult>> {
677 ensure!(!self.is_suspended(), error::SuspendedSnafu);
678
679 if matches!(
680 stmt,
681 Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
682 ) {
683 self.plugins
684 .get::<PermissionCheckerRef>()
685 .as_ref()
686 .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
687 .context(PermissionSnafu)?;
688
689 let plan = self
690 .query_engine
691 .planner()
692 .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
693 .await
694 .context(PlanStatementSnafu)?;
695 self.query_engine
696 .describe(plan, query_ctx)
697 .await
698 .map(Some)
699 .context(error::DescribeStatementSnafu)
700 } else {
701 Ok(None)
702 }
703 }
704
705 async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result<bool> {
706 self.catalog_manager
707 .schema_exists(catalog, schema, None)
708 .await
709 .context(error::CatalogSnafu)
710 }
711}
712
713#[async_trait]
714impl SqlQueryHandler for Instance {
715 async fn do_query(
716 &self,
717 query: &str,
718 query_ctx: QueryContextRef,
719 ) -> Vec<server_error::Result<Output>> {
720 self.do_query_inner(query, query_ctx)
721 .await
722 .into_iter()
723 .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
724 .collect()
725 }
726
727 async fn do_exec_plan(
728 &self,
729 stmt: Option<Statement>,
730 plan: LogicalPlan,
731 query_ctx: QueryContextRef,
732 ) -> server_error::Result<Output> {
733 self.do_exec_plan_inner(stmt, plan, query_ctx)
734 .await
735 .map_err(BoxedError::new)
736 .context(server_error::ExecutePlanSnafu)
737 }
738
739 async fn do_promql_query(
740 &self,
741 query: &PromQuery,
742 query_ctx: QueryContextRef,
743 ) -> Vec<server_error::Result<Output>> {
744 self.do_promql_query_inner(query, query_ctx)
745 .await
746 .into_iter()
747 .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
748 .collect()
749 }
750
751 async fn do_describe(
752 &self,
753 stmt: Statement,
754 query_ctx: QueryContextRef,
755 ) -> server_error::Result<Option<DescribeResult>> {
756 self.do_describe_inner(stmt, query_ctx)
757 .await
758 .map_err(BoxedError::new)
759 .context(server_error::DescribeStatementSnafu)
760 }
761
762 async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
763 self.is_valid_schema_inner(catalog, schema)
764 .await
765 .map_err(BoxedError::new)
766 .context(server_error::CheckDatabaseValiditySnafu)
767 }
768}
769
770pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
772 match output.data {
773 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
774 OutputData::Stream(stream) => {
775 let stream = OnDone::new(stream, move || {
776 timer.observe_duration();
777 });
778 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
779 }
780 }
781}
782
783#[async_trait]
784impl PrometheusHandler for Instance {
785 #[tracing::instrument(skip_all)]
786 async fn do_query(
787 &self,
788 query: &PromQuery,
789 query_ctx: QueryContextRef,
790 ) -> server_error::Result<Output> {
791 let interceptor = self
792 .plugins
793 .get::<PromQueryInterceptorRef<server_error::Error>>();
794
795 self.plugins
796 .get::<PermissionCheckerRef>()
797 .as_ref()
798 .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
799 .context(AuthSnafu)?;
800
801 let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
802 ParsePromQLSnafu {
803 query: query.clone(),
804 }
805 })?;
806
807 let plan = self
808 .statement_executor
809 .plan(&stmt, query_ctx.clone())
810 .await
811 .map_err(BoxedError::new)
812 .context(ExecuteQuerySnafu)?;
813
814 interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
815
816 let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
818 CatalogQueryStatement::Promql(eval_stmt, alias)
819 } else {
820 return UnexpectedResultSnafu {
822 reason: "The query should always be promql.".to_string(),
823 }
824 .fail();
825 };
826 let query = query_statement.to_string();
827
828 let slow_query_timer = self
829 .slow_query_options
830 .enable
831 .then(|| self.event_recorder.clone())
832 .flatten()
833 .map(|event_recorder| {
834 SlowQueryTimer::new(
835 query_statement,
836 self.slow_query_options.threshold,
837 self.slow_query_options.sample_ratio,
838 self.slow_query_options.record_type,
839 event_recorder,
840 )
841 });
842
843 let ticket = self.process_manager.register_query(
844 query_ctx.current_catalog().to_string(),
845 vec![query_ctx.current_schema()],
846 query,
847 query_ctx.conn_info().to_string(),
848 Some(query_ctx.process_id()),
849 slow_query_timer,
850 );
851
852 let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
853
854 let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
855 .await
856 .map_err(|_| servers::error::CancelledSnafu.build())?
857 .map(|output| {
858 let Output { meta, data } = output;
859 let data = match data {
860 OutputData::Stream(stream) => {
861 OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
862 }
863 other => other,
864 };
865 Output { data, meta }
866 })
867 .map_err(BoxedError::new)
868 .context(ExecuteQuerySnafu)?;
869
870 Ok(interceptor.post_execute(output, query_ctx)?)
871 }
872
873 async fn query_metric_names(
874 &self,
875 matchers: Vec<Matcher>,
876 ctx: &QueryContextRef,
877 ) -> server_error::Result<Vec<String>> {
878 self.handle_query_metric_names(matchers, ctx)
879 .await
880 .map_err(BoxedError::new)
881 .context(ExecuteQuerySnafu)
882 }
883
884 async fn query_label_values(
885 &self,
886 metric: String,
887 label_name: String,
888 matchers: Vec<Matcher>,
889 start: SystemTime,
890 end: SystemTime,
891 ctx: &QueryContextRef,
892 ) -> server_error::Result<Vec<String>> {
893 self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
894 .await
895 .map_err(BoxedError::new)
896 .context(ExecuteQuerySnafu)
897 }
898
899 fn catalog_manager(&self) -> CatalogManagerRef {
900 self.catalog_manager.clone()
901 }
902}
903
904macro_rules! validate_db_permission {
906 ($stmt: expr, $query_ctx: expr) => {
907 if let Some(database) = &$stmt.database {
908 validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
909 .map_err(BoxedError::new)
910 .context(SqlExecInterceptedSnafu)?;
911 }
912 };
913}
914
915pub fn check_permission(
916 plugins: Plugins,
917 stmt: &Statement,
918 query_ctx: &QueryContextRef,
919) -> Result<()> {
920 let need_validate = plugins
921 .get::<QueryOptions>()
922 .map(|opts| opts.disallow_cross_catalog_query)
923 .unwrap_or_default();
924
925 if !need_validate {
926 return Ok(());
927 }
928
929 match stmt {
930 Statement::Admin(_) => {}
933 Statement::Query(_)
935 | Statement::Explain(_)
936 | Statement::Tql(_)
937 | Statement::Delete(_)
938 | Statement::DeclareCursor(_)
939 | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
940 Statement::CreateDatabase(_)
942 | Statement::ShowDatabases(_)
943 | Statement::DropDatabase(_)
944 | Statement::AlterDatabase(_)
945 | Statement::DropFlow(_)
946 | Statement::Use(_) => {}
947 #[cfg(feature = "enterprise")]
948 Statement::DropTrigger(_) => {}
949 Statement::ShowCreateDatabase(stmt) => {
950 validate_database(&stmt.database_name, query_ctx)?;
951 }
952 Statement::ShowCreateTable(stmt) => {
953 validate_param(&stmt.table_name, query_ctx)?;
954 }
955 Statement::ShowCreateFlow(stmt) => {
956 validate_flow(&stmt.flow_name, query_ctx)?;
957 }
958 #[cfg(feature = "enterprise")]
959 Statement::ShowCreateTrigger(stmt) => {
960 validate_param(&stmt.trigger_name, query_ctx)?;
961 }
962 Statement::ShowCreateView(stmt) => {
963 validate_param(&stmt.view_name, query_ctx)?;
964 }
965 Statement::CreateExternalTable(stmt) => {
966 validate_param(&stmt.name, query_ctx)?;
967 }
968 Statement::CreateFlow(stmt) => {
969 validate_param(&stmt.sink_table_name, query_ctx)?;
971 }
972 #[cfg(feature = "enterprise")]
973 Statement::CreateTrigger(stmt) => {
974 validate_param(&stmt.trigger_name, query_ctx)?;
975 }
976 Statement::CreateView(stmt) => {
977 validate_param(&stmt.name, query_ctx)?;
978 }
979 Statement::AlterTable(stmt) => {
980 validate_param(stmt.table_name(), query_ctx)?;
981 }
982 #[cfg(feature = "enterprise")]
983 Statement::AlterTrigger(_) => {}
984 Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
986 Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
988
989 Statement::Comment(comment) => match &comment.object {
990 CommentObject::Table(table) => validate_param(table, query_ctx)?,
991 CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
992 CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
993 },
994
995 Statement::Insert(insert) => {
996 let name = insert.table_name().context(ParseSqlSnafu)?;
997 validate_param(name, query_ctx)?;
998 }
999 Statement::CreateTable(stmt) => {
1000 validate_param(&stmt.name, query_ctx)?;
1001 }
1002 Statement::CreateTableLike(stmt) => {
1003 validate_param(&stmt.table_name, query_ctx)?;
1004 validate_param(&stmt.source_name, query_ctx)?;
1005 }
1006 Statement::DropTable(drop_stmt) => {
1007 for table_name in drop_stmt.table_names() {
1008 validate_param(table_name, query_ctx)?;
1009 }
1010 }
1011 Statement::DropView(stmt) => {
1012 validate_param(&stmt.view_name, query_ctx)?;
1013 }
1014 Statement::ShowTables(stmt) => {
1015 validate_db_permission!(stmt, query_ctx);
1016 }
1017 Statement::ShowTableStatus(stmt) => {
1018 validate_db_permission!(stmt, query_ctx);
1019 }
1020 Statement::ShowColumns(stmt) => {
1021 validate_db_permission!(stmt, query_ctx);
1022 }
1023 Statement::ShowIndex(stmt) => {
1024 validate_db_permission!(stmt, query_ctx);
1025 }
1026 Statement::ShowRegion(stmt) => {
1027 validate_db_permission!(stmt, query_ctx);
1028 }
1029 Statement::ShowViews(stmt) => {
1030 validate_db_permission!(stmt, query_ctx);
1031 }
1032 Statement::ShowFlows(stmt) => {
1033 validate_db_permission!(stmt, query_ctx);
1034 }
1035 #[cfg(feature = "enterprise")]
1036 Statement::ShowTriggers(_stmt) => {
1037 }
1040 Statement::ShowStatus(_stmt) => {}
1041 Statement::ShowSearchPath(_stmt) => {}
1042 Statement::DescribeTable(stmt) => {
1043 validate_param(stmt.name(), query_ctx)?;
1044 }
1045 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1046 CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?,
1047 CopyTable::From(copy_table_from) => {
1048 validate_param(©_table_from.table_name, query_ctx)?
1049 }
1050 },
1051 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1052 match copy_database {
1053 CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1054 CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1055 }
1056 }
1057 Statement::TruncateTable(stmt) => {
1058 validate_param(stmt.table_name(), query_ctx)?;
1059 }
1060 Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1062 Statement::Kill(_) => {}
1064 Statement::ShowProcesslist(_) => {}
1066 }
1067 Ok(())
1068}
1069
1070fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1071 let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1072 .map_err(BoxedError::new)
1073 .context(ExternalSnafu)?;
1074
1075 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1076 .map_err(BoxedError::new)
1077 .context(SqlExecInterceptedSnafu)
1078}
1079
1080fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1081 let catalog = match &name.0[..] {
1082 [_flow] => query_ctx.current_catalog().to_string(),
1083 [catalog, _flow] => catalog.to_string_unquoted(),
1084 _ => {
1085 return InvalidSqlSnafu {
1086 err_msg: format!(
1087 "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1088 ),
1089 }
1090 .fail();
1091 }
1092 };
1093
1094 let schema = query_ctx.current_schema();
1095
1096 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1097 .map_err(BoxedError::new)
1098 .context(SqlExecInterceptedSnafu)
1099}
1100
1101fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1102 let (catalog, schema) = match &name.0[..] {
1103 [schema] => (
1104 query_ctx.current_catalog().to_string(),
1105 schema.to_string_unquoted(),
1106 ),
1107 [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1108 _ => InvalidSqlSnafu {
1109 err_msg: format!(
1110 "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1111 ),
1112 }
1113 .fail()?,
1114 };
1115
1116 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1117 .map_err(BoxedError::new)
1118 .context(SqlExecInterceptedSnafu)
1119}
1120
1121fn should_capture_statement(stmt: Option<&Statement>) -> bool {
1123 if let Some(stmt) = stmt {
1124 matches!(stmt, Statement::Query(_)) || stmt.is_readonly()
1125 } else {
1126 false
1127 }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132 use std::collections::HashMap;
1133 use std::sync::atomic::{AtomicBool, Ordering};
1134 use std::sync::{Arc, Barrier};
1135 use std::thread;
1136 use std::time::{Duration, Instant};
1137
1138 use common_base::Plugins;
1139 use query::query_engine::options::QueryOptions;
1140 use session::context::QueryContext;
1141 use sql::dialect::GreptimeDbDialect;
1142 use strfmt::Format;
1143
1144 use super::*;
1145
1146 #[test]
1147 fn test_fast_legacy_check_deadlock_prevention() {
1148 let cache = DashMap::new();
1150
1151 cache.insert("metric1".to_string(), true); cache.insert("metric2".to_string(), false); cache.insert("metric3".to_string(), true); let metric1 = "metric1".to_string();
1158 let metric4 = "metric4".to_string();
1159 let names1 = vec![&metric1, &metric4];
1160 let result = fast_legacy_check(&cache, &names1);
1161 assert!(result.is_ok());
1162 assert_eq!(result.unwrap(), Some(true)); assert!(cache.contains_key("metric4"));
1166 assert!(*cache.get("metric4").unwrap().value());
1167
1168 let metric5 = "metric5".to_string();
1170 let metric6 = "metric6".to_string();
1171 let names2 = vec![&metric5, &metric6];
1172 let result = fast_legacy_check(&cache, &names2);
1173 assert!(result.is_ok());
1174 assert_eq!(result.unwrap(), None); let cache_incompatible = DashMap::new();
1178 cache_incompatible.insert("metric1".to_string(), true); cache_incompatible.insert("metric2".to_string(), false); let metric1_test = "metric1".to_string();
1181 let metric2_test = "metric2".to_string();
1182 let names3 = vec![&metric1_test, &metric2_test];
1183 let result = fast_legacy_check(&cache_incompatible, &names3);
1184 assert!(result.is_err()); let cache_concurrent = Arc::new(DashMap::new());
1190 cache_concurrent.insert("shared_metric".to_string(), true);
1191
1192 let num_threads = 8;
1193 let operations_per_thread = 100;
1194 let barrier = Arc::new(Barrier::new(num_threads));
1195 let success_flag = Arc::new(AtomicBool::new(true));
1196
1197 let handles: Vec<_> = (0..num_threads)
1198 .map(|thread_id| {
1199 let cache_clone = Arc::clone(&cache_concurrent);
1200 let barrier_clone = Arc::clone(&barrier);
1201 let success_flag_clone = Arc::clone(&success_flag);
1202
1203 thread::spawn(move || {
1204 barrier_clone.wait();
1206
1207 let start_time = Instant::now();
1208 for i in 0..operations_per_thread {
1209 let shared_metric = "shared_metric".to_string();
1211 let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1212 let names = vec![&shared_metric, &new_metric];
1213
1214 match fast_legacy_check(&cache_clone, &names) {
1215 Ok(_) => {}
1216 Err(_) => {
1217 success_flag_clone.store(false, Ordering::Relaxed);
1218 return;
1219 }
1220 }
1221
1222 if start_time.elapsed() > Duration::from_secs(10) {
1224 success_flag_clone.store(false, Ordering::Relaxed);
1225 return;
1226 }
1227 }
1228 })
1229 })
1230 .collect();
1231
1232 let start_time = Instant::now();
1234 for (i, handle) in handles.into_iter().enumerate() {
1235 let join_result = handle.join();
1236
1237 if start_time.elapsed() > Duration::from_secs(30) {
1239 panic!("Test timed out - possible deadlock detected!");
1240 }
1241
1242 if join_result.is_err() {
1243 panic!("Thread {} panicked during execution", i);
1244 }
1245 }
1246
1247 assert!(
1249 success_flag.load(Ordering::Relaxed),
1250 "Some operations failed"
1251 );
1252
1253 let final_count = cache_concurrent.len();
1255 assert!(
1256 final_count > 1 + num_threads * operations_per_thread / 2,
1257 "Expected more cache entries, got {}",
1258 final_count
1259 );
1260 }
1261
1262 #[test]
1263 fn test_exec_validation() {
1264 let query_ctx = QueryContext::arc();
1265 let plugins: Plugins = Plugins::new();
1266 plugins.insert(QueryOptions {
1267 disallow_cross_catalog_query: true,
1268 });
1269
1270 let sql = r#"
1271 SELECT * FROM demo;
1272 EXPLAIN SELECT * FROM demo;
1273 CREATE DATABASE test_database;
1274 SHOW DATABASES;
1275 "#;
1276 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1277 assert_eq!(stmts.len(), 4);
1278 for stmt in stmts {
1279 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1280 re.unwrap();
1281 }
1282
1283 let sql = r#"
1284 SHOW CREATE TABLE demo;
1285 ALTER TABLE demo ADD COLUMN new_col INT;
1286 "#;
1287 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1288 assert_eq!(stmts.len(), 2);
1289 for stmt in stmts {
1290 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1291 re.unwrap();
1292 }
1293
1294 fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1295 let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1297 for (catalog, schema) in right {
1298 let sql = do_fmt(template_sql, catalog, schema);
1299 do_test(&sql, plugins.clone(), query_ctx, true);
1300 }
1301
1302 let wrong = vec![
1303 ("wrongcatalog.", "public."),
1304 ("wrongcatalog.", "wrongschema."),
1305 ];
1306 for (catalog, schema) in wrong {
1307 let sql = do_fmt(template_sql, catalog, schema);
1308 do_test(&sql, plugins.clone(), query_ctx, false);
1309 }
1310 }
1311
1312 fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1313 let vars = HashMap::from([
1314 ("catalog".to_string(), catalog),
1315 ("schema".to_string(), schema),
1316 ]);
1317 template.format(&vars).unwrap()
1318 }
1319
1320 fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1321 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1322 let re = check_permission(plugins, stmt, query_ctx);
1323 if is_ok {
1324 re.unwrap();
1325 } else {
1326 assert!(re.is_err());
1327 }
1328 }
1329
1330 let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1332 replace_test(sql, plugins.clone(), &query_ctx);
1333
1334 let sql = r#"CREATE TABLE {catalog}{schema}demo(
1336 host STRING,
1337 ts TIMESTAMP,
1338 TIME INDEX (ts),
1339 PRIMARY KEY(host)
1340 ) engine=mito;"#;
1341 replace_test(sql, plugins.clone(), &query_ctx);
1342
1343 let sql = "DROP TABLE {catalog}{schema}demo;";
1345 replace_test(sql, plugins.clone(), &query_ctx);
1346
1347 let sql = "SHOW TABLES FROM public";
1349 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1350 check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1351
1352 let sql = "SHOW TABLES FROM private";
1353 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1354 let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1355 assert!(re.is_ok());
1356
1357 let sql = "DESC TABLE {catalog}{schema}demo;";
1359 replace_test(sql, plugins.clone(), &query_ctx);
1360
1361 let comment_flow_cases = [
1362 ("COMMENT ON FLOW my_flow IS 'comment';", true),
1363 ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
1364 ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
1365 ];
1366 for (sql, is_ok) in comment_flow_cases {
1367 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1368 let result = check_permission(plugins.clone(), stmt, &query_ctx);
1369 assert_eq!(result.is_ok(), is_ok);
1370 }
1371
1372 let show_flow_cases = [
1373 ("SHOW CREATE FLOW my_flow;", true),
1374 ("SHOW CREATE FLOW greptime.my_flow;", true),
1375 ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
1376 ];
1377 for (sql, is_ok) in show_flow_cases {
1378 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1379 let result = check_permission(plugins.clone(), stmt, &query_ctx);
1380 assert_eq!(result.is_ok(), is_ok);
1381 }
1382 }
1383}