1pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::pin::Pin;
29use std::sync::atomic::AtomicBool;
30use std::sync::{Arc, atomic};
31use std::time::{Duration, SystemTime};
32
33use async_stream::stream;
34use async_trait::async_trait;
35use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
36use catalog::CatalogManagerRef;
37use catalog::process_manager::{
38 ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
39};
40use client::OutputData;
41use common_base::Plugins;
42use common_base::cancellation::CancellableFuture;
43use common_error::ext::{BoxedError, ErrorExt};
44use common_event_recorder::EventRecorderRef;
45use common_meta::cache_invalidator::CacheInvalidatorRef;
46use common_meta::key::TableMetadataManagerRef;
47use common_meta::key::table_name::TableNameKey;
48use common_meta::node_manager::NodeManagerRef;
49use common_meta::procedure_executor::ProcedureExecutorRef;
50use common_query::Output;
51use common_recordbatch::RecordBatchStreamWrapper;
52use common_recordbatch::error::StreamTimeoutSnafu;
53use common_telemetry::logging::SlowQueryOptions;
54use common_telemetry::{debug, error, tracing};
55use dashmap::DashMap;
56use datafusion_expr::LogicalPlan;
57use futures::{Stream, StreamExt};
58use lazy_static::lazy_static;
59use operator::delete::DeleterRef;
60use operator::insert::InserterRef;
61use operator::statement::{StatementExecutor, StatementExecutorRef};
62use partition::manager::PartitionRuleManagerRef;
63use pipeline::pipeline_operator::PipelineOperator;
64use prometheus::HistogramTimer;
65use promql_parser::label::Matcher;
66use query::QueryEngineRef;
67use query::metrics::OnDone;
68use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
69use query::query_engine::DescribeResult;
70use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
71use servers::error::{
72 self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
73 OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
74};
75use servers::interceptor::{
76 PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
77};
78use servers::otlp::metrics::legacy_normalize_otlp_name;
79use servers::prometheus_handler::PrometheusHandler;
80use servers::query_handler::sql::SqlQueryHandler;
81use session::context::{Channel, QueryContextRef};
82use session::table_name::table_idents_to_full_name;
83use snafu::prelude::*;
84use sql::ast::ObjectNamePartExt;
85use sql::dialect::Dialect;
86use sql::parser::{ParseOptions, ParserContext};
87use sql::statements::comment::CommentObject;
88use sql::statements::copy::{CopyDatabase, CopyTable};
89use sql::statements::statement::Statement;
90use sql::statements::tql::Tql;
91use sqlparser::ast::ObjectName;
92pub use standalone::StandaloneDatanodeManager;
93use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
94use tracing::Span;
95
96use crate::error::{
97 self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
98 ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
99 StatementTimeoutSnafu, TableOperationSnafu,
100};
101use crate::stream_wrapper::CancellableStreamWrapper;
102
103lazy_static! {
104 static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
105}
106
107#[derive(Clone)]
111pub struct Instance {
112 catalog_manager: CatalogManagerRef,
113 pipeline_operator: Arc<PipelineOperator>,
114 statement_executor: Arc<StatementExecutor>,
115 query_engine: QueryEngineRef,
116 plugins: Plugins,
117 inserter: InserterRef,
118 deleter: DeleterRef,
119 table_metadata_manager: TableMetadataManagerRef,
120 event_recorder: Option<EventRecorderRef>,
121 process_manager: ProcessManagerRef,
122 slow_query_options: SlowQueryOptions,
123 suspend: Arc<AtomicBool>,
124
125 otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
130}
131
132impl Instance {
133 pub fn catalog_manager(&self) -> &CatalogManagerRef {
134 &self.catalog_manager
135 }
136
137 pub fn query_engine(&self) -> &QueryEngineRef {
138 &self.query_engine
139 }
140
141 pub fn plugins(&self) -> &Plugins {
142 &self.plugins
143 }
144
145 pub fn statement_executor(&self) -> &StatementExecutorRef {
146 &self.statement_executor
147 }
148
149 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
150 &self.table_metadata_manager
151 }
152
153 pub fn inserter(&self) -> &InserterRef {
154 &self.inserter
155 }
156
157 pub fn process_manager(&self) -> &ProcessManagerRef {
158 &self.process_manager
159 }
160
161 pub fn node_manager(&self) -> &NodeManagerRef {
162 self.inserter.node_manager()
163 }
164
165 pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
166 self.inserter.partition_manager()
167 }
168
169 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
170 self.statement_executor.cache_invalidator()
171 }
172
173 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
174 self.statement_executor.procedure_executor()
175 }
176
177 pub fn suspend_state(&self) -> Arc<AtomicBool> {
178 self.suspend.clone()
179 }
180
181 pub(crate) fn is_suspended(&self) -> bool {
182 self.suspend.load(atomic::Ordering::Relaxed)
183 }
184}
185
186fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
187 ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
188}
189
190impl Instance {
191 async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
192 check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
193
194 let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
195 let query_interceptor = query_interceptor.as_ref();
196
197 if should_capture_statement(Some(&stmt)) {
198 let slow_query_timer = self
199 .slow_query_options
200 .enable
201 .then(|| self.event_recorder.clone())
202 .flatten()
203 .map(|event_recorder| {
204 SlowQueryTimer::new(
205 CatalogQueryStatement::Sql(stmt.clone()),
206 self.slow_query_options.threshold,
207 self.slow_query_options.sample_ratio,
208 self.slow_query_options.record_type,
209 event_recorder,
210 )
211 });
212
213 let ticket = self.process_manager.register_query(
214 query_ctx.current_catalog().to_string(),
215 vec![query_ctx.current_schema()],
216 stmt.to_string(),
217 query_ctx.conn_info().to_string(),
218 Some(query_ctx.process_id()),
219 slow_query_timer,
220 );
221
222 let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
223
224 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
225 .await
226 .map_err(|_| error::CancelledSnafu.build())?
227 .map(|output| {
228 let Output { meta, data } = output;
229
230 let data = match data {
231 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
232 CancellableStreamWrapper::new(stream, ticket),
233 )),
234 other => other,
235 };
236 Output { data, meta }
237 })
238 } else {
239 self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
240 .await
241 }
242 }
243
244 async fn exec_statement_with_timeout(
245 &self,
246 stmt: Statement,
247 query_ctx: QueryContextRef,
248 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
249 ) -> Result<Output> {
250 let timeout = derive_timeout(&stmt, &query_ctx);
251 match timeout {
252 Some(timeout) => {
253 let start = tokio::time::Instant::now();
254 let output = tokio::time::timeout(
255 timeout,
256 self.exec_statement(stmt, query_ctx, query_interceptor),
257 )
258 .await
259 .map_err(|_| StatementTimeoutSnafu.build())??;
260 let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
262 attach_timeout(output, remaining_timeout)
263 }
264 None => {
265 self.exec_statement(stmt, query_ctx, query_interceptor)
266 .await
267 }
268 }
269 }
270
271 async fn exec_statement(
272 &self,
273 stmt: Statement,
274 query_ctx: QueryContextRef,
275 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
276 ) -> Result<Output> {
277 match stmt {
278 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
279 if let Statement::Explain(explain) = &stmt
281 && let Some(format) = explain.format()
282 {
283 query_ctx.set_explain_format(format.to_string());
284 }
285
286 self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
287 .await
288 }
289 Statement::Tql(tql) => {
290 self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
291 .await
292 }
293 _ => {
294 query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
295 self.statement_executor
296 .execute_sql(stmt, query_ctx)
297 .await
298 .context(TableOperationSnafu)
299 }
300 }
301 }
302
303 async fn plan_and_exec_sql(
304 &self,
305 stmt: Statement,
306 query_ctx: &QueryContextRef,
307 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
308 ) -> Result<Output> {
309 let stmt = QueryStatement::Sql(stmt);
310 let plan = self
311 .statement_executor
312 .plan(&stmt, query_ctx.clone())
313 .await?;
314 let QueryStatement::Sql(stmt) = stmt else {
315 unreachable!()
316 };
317 query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
318 self.statement_executor
319 .exec_plan(plan, query_ctx.clone())
320 .await
321 .context(TableOperationSnafu)
322 }
323
324 async fn plan_and_exec_tql(
325 &self,
326 query_ctx: &QueryContextRef,
327 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
328 tql: Tql,
329 ) -> Result<Output> {
330 let plan = self
331 .statement_executor
332 .plan_tql(tql.clone(), query_ctx)
333 .await?;
334 query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
335 self.statement_executor
336 .exec_plan(plan, query_ctx.clone())
337 .await
338 .context(TableOperationSnafu)
339 }
340
341 async fn check_otlp_legacy(
342 &self,
343 names: &[&String],
344 ctx: QueryContextRef,
345 ) -> server_error::Result<bool> {
346 let db_string = ctx.get_db_string();
347 let cache = self
349 .otlp_metrics_table_legacy_cache
350 .entry(db_string.clone())
351 .or_default();
352 if let Some(flag) = fast_legacy_check(&cache, names)? {
353 return Ok(flag);
354 }
355 drop(cache);
357
358 let catalog = ctx.current_catalog();
359 let schema = ctx.current_schema();
360
361 let normalized_names = names
363 .iter()
364 .map(|n| legacy_normalize_otlp_name(n))
365 .collect::<Vec<_>>();
366 let table_names = normalized_names
367 .iter()
368 .map(|n| TableNameKey::new(catalog, &schema, n))
369 .collect::<Vec<_>>();
370 let table_values = self
371 .table_metadata_manager()
372 .table_name_manager()
373 .batch_get(table_names)
374 .await
375 .context(CommonMetaSnafu)?;
376 let table_ids = table_values
377 .into_iter()
378 .filter_map(|v| v.map(|vi| vi.table_id()))
379 .collect::<Vec<_>>();
380
381 if table_ids.is_empty() {
383 let cache = self
384 .otlp_metrics_table_legacy_cache
385 .entry(db_string)
386 .or_default();
387 names.iter().for_each(|name| {
388 cache.insert((*name).clone(), false);
389 });
390 return Ok(false);
391 }
392
393 let table_infos = self
395 .table_metadata_manager()
396 .table_info_manager()
397 .batch_get(&table_ids)
398 .await
399 .context(CommonMetaSnafu)?;
400 let options = table_infos
401 .values()
402 .map(|info| {
403 info.table_info
404 .meta
405 .options
406 .extra_options
407 .get(OTLP_METRIC_COMPAT_KEY)
408 .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
409 })
410 .collect::<Vec<_>>();
411 let cache = self
412 .otlp_metrics_table_legacy_cache
413 .entry(db_string)
414 .or_default();
415 if !options.is_empty() {
416 let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
418 let has_legacy = options
419 .iter()
420 .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
421 ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
422 let flag = has_legacy;
423 names.iter().for_each(|name| {
424 cache.insert((*name).clone(), flag);
425 });
426 Ok(flag)
427 } else {
428 names.iter().for_each(|name| {
430 cache.insert((*name).clone(), false);
431 });
432 Ok(false)
433 }
434 }
435}
436
437fn fast_legacy_check(
438 cache: &DashMap<String, bool>,
439 names: &[&String],
440) -> server_error::Result<Option<bool>> {
441 let hit_cache = names
442 .iter()
443 .filter_map(|name| cache.get(*name))
444 .collect::<Vec<_>>();
445 if !hit_cache.is_empty() {
446 let hit_legacy = hit_cache.iter().any(|en| *en.value());
447 let hit_prom = hit_cache.iter().any(|en| !*en.value());
448
449 ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
453
454 let flag = hit_legacy;
455 drop(hit_cache);
457
458 names.iter().for_each(|name| {
460 if !cache.contains_key(*name) {
461 cache.insert((*name).clone(), flag);
462 }
463 });
464 Ok(Some(flag))
465 } else {
466 Ok(None)
467 }
468}
469
470fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
473 let query_timeout = query_ctx.query_timeout()?;
474 if query_timeout.is_zero() {
475 return None;
476 }
477 match query_ctx.channel() {
478 Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
479 Channel::Postgres => Some(query_timeout),
480 _ => None,
481 }
482}
483
484fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
485 if timeout.is_zero() {
486 return StatementTimeoutSnafu.fail();
487 }
488
489 let output = match output.data {
490 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
491 OutputData::Stream(mut stream) => {
492 let schema = stream.schema();
493 let s = Box::pin(stream! {
494 let mut start = tokio::time::Instant::now();
495 while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
496 yield item;
497
498 let now = tokio::time::Instant::now();
499 timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
500 start = now;
501 if timeout.is_zero() {
503 StreamTimeoutSnafu.fail()?;
504 }
505 }
506 }) as Pin<Box<dyn Stream<Item = _> + Send>>;
507 let stream = RecordBatchStreamWrapper {
508 schema,
509 stream: s,
510 output_ordering: None,
511 metrics: Default::default(),
512 span: Span::current(),
513 };
514 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
515 }
516 };
517
518 Ok(output)
519}
520
521#[async_trait]
522impl SqlQueryHandler for Instance {
523 type Error = Error;
524
525 #[tracing::instrument(skip_all)]
526 async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
527 if self.is_suspended() {
528 return vec![error::SuspendedSnafu {}.fail()];
529 }
530
531 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
532 let query_interceptor = query_interceptor_opt.as_ref();
533 let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
534 Ok(q) => q,
535 Err(e) => return vec![Err(e)],
536 };
537
538 let checker_ref = self.plugins.get::<PermissionCheckerRef>();
539 let checker = checker_ref.as_ref();
540
541 match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
542 .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
543 {
544 Ok(stmts) => {
545 if stmts.is_empty() {
546 return vec![
547 InvalidSqlSnafu {
548 err_msg: "empty statements",
549 }
550 .fail(),
551 ];
552 }
553
554 let mut results = Vec::with_capacity(stmts.len());
555 for stmt in stmts {
556 if let Err(e) = checker
557 .check_permission(
558 query_ctx.current_user(),
559 PermissionReq::SqlStatement(&stmt),
560 )
561 .context(PermissionSnafu)
562 {
563 results.push(Err(e));
564 break;
565 }
566
567 match self.query_statement(stmt.clone(), query_ctx.clone()).await {
568 Ok(output) => {
569 let output_result =
570 query_interceptor.post_execute(output, query_ctx.clone());
571 results.push(output_result);
572 }
573 Err(e) => {
574 if e.status_code().should_log_error() {
575 error!(e; "Failed to execute query: {stmt}");
576 } else {
577 debug!("Failed to execute query: {stmt}, {e}");
578 }
579 results.push(Err(e));
580 break;
581 }
582 }
583 }
584 results
585 }
586 Err(e) => {
587 vec![Err(e)]
588 }
589 }
590 }
591
592 async fn do_exec_plan(
593 &self,
594 stmt: Option<Statement>,
595 plan: LogicalPlan,
596 query_ctx: QueryContextRef,
597 ) -> Result<Output> {
598 ensure!(!self.is_suspended(), error::SuspendedSnafu);
599
600 if should_capture_statement(stmt.as_ref()) {
601 let stmt = stmt.unwrap();
603 let query = stmt.to_string();
604 let slow_query_timer = self
605 .slow_query_options
606 .enable
607 .then(|| self.event_recorder.clone())
608 .flatten()
609 .map(|event_recorder| {
610 SlowQueryTimer::new(
611 CatalogQueryStatement::Sql(stmt.clone()),
612 self.slow_query_options.threshold,
613 self.slow_query_options.sample_ratio,
614 self.slow_query_options.record_type,
615 event_recorder,
616 )
617 });
618
619 let ticket = self.process_manager.register_query(
620 query_ctx.current_catalog().to_string(),
621 vec![query_ctx.current_schema()],
622 query,
623 query_ctx.conn_info().to_string(),
624 Some(query_ctx.process_id()),
625 slow_query_timer,
626 );
627
628 let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
629
630 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
631 .await
632 .map_err(|_| error::CancelledSnafu.build())?
633 .map(|output| {
634 let Output { meta, data } = output;
635
636 let data = match data {
637 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
638 CancellableStreamWrapper::new(stream, ticket),
639 )),
640 other => other,
641 };
642 Output { data, meta }
643 })
644 .context(ExecLogicalPlanSnafu)
645 } else {
646 self.query_engine
649 .execute(plan.clone(), query_ctx)
650 .await
651 .context(ExecLogicalPlanSnafu)
652 }
653 }
654
655 #[tracing::instrument(skip_all)]
656 async fn do_promql_query(
657 &self,
658 query: &PromQuery,
659 query_ctx: QueryContextRef,
660 ) -> Vec<Result<Output>> {
661 if self.is_suspended() {
662 return vec![error::SuspendedSnafu {}.fail()];
663 }
664
665 let result = PrometheusHandler::do_query(self, query, query_ctx)
667 .await
668 .with_context(|_| ExecutePromqlSnafu {
669 query: format!("{query:?}"),
670 });
671 vec![result]
672 }
673
674 async fn do_describe(
675 &self,
676 stmt: Statement,
677 query_ctx: QueryContextRef,
678 ) -> Result<Option<DescribeResult>> {
679 ensure!(!self.is_suspended(), error::SuspendedSnafu);
680
681 if matches!(
682 stmt,
683 Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
684 ) {
685 self.plugins
686 .get::<PermissionCheckerRef>()
687 .as_ref()
688 .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
689 .context(PermissionSnafu)?;
690
691 let plan = self
692 .query_engine
693 .planner()
694 .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
695 .await
696 .context(PlanStatementSnafu)?;
697 self.query_engine
698 .describe(plan, query_ctx)
699 .await
700 .map(Some)
701 .context(error::DescribeStatementSnafu)
702 } else {
703 Ok(None)
704 }
705 }
706
707 async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
708 self.catalog_manager
709 .schema_exists(catalog, schema, None)
710 .await
711 .context(error::CatalogSnafu)
712 }
713}
714
715pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
717 match output.data {
718 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
719 OutputData::Stream(stream) => {
720 let stream = OnDone::new(stream, move || {
721 timer.observe_duration();
722 });
723 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
724 }
725 }
726}
727
728#[async_trait]
729impl PrometheusHandler for Instance {
730 #[tracing::instrument(skip_all)]
731 async fn do_query(
732 &self,
733 query: &PromQuery,
734 query_ctx: QueryContextRef,
735 ) -> server_error::Result<Output> {
736 let interceptor = self
737 .plugins
738 .get::<PromQueryInterceptorRef<server_error::Error>>();
739
740 self.plugins
741 .get::<PermissionCheckerRef>()
742 .as_ref()
743 .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
744 .context(AuthSnafu)?;
745
746 let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
747 ParsePromQLSnafu {
748 query: query.clone(),
749 }
750 })?;
751
752 let plan = self
753 .statement_executor
754 .plan(&stmt, query_ctx.clone())
755 .await
756 .map_err(BoxedError::new)
757 .context(ExecuteQuerySnafu)?;
758
759 interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
760
761 let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
763 CatalogQueryStatement::Promql(eval_stmt, alias)
764 } else {
765 return UnexpectedResultSnafu {
767 reason: "The query should always be promql.".to_string(),
768 }
769 .fail();
770 };
771 let query = query_statement.to_string();
772
773 let slow_query_timer = self
774 .slow_query_options
775 .enable
776 .then(|| self.event_recorder.clone())
777 .flatten()
778 .map(|event_recorder| {
779 SlowQueryTimer::new(
780 query_statement,
781 self.slow_query_options.threshold,
782 self.slow_query_options.sample_ratio,
783 self.slow_query_options.record_type,
784 event_recorder,
785 )
786 });
787
788 let ticket = self.process_manager.register_query(
789 query_ctx.current_catalog().to_string(),
790 vec![query_ctx.current_schema()],
791 query,
792 query_ctx.conn_info().to_string(),
793 Some(query_ctx.process_id()),
794 slow_query_timer,
795 );
796
797 let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
798
799 let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
800 .await
801 .map_err(|_| servers::error::CancelledSnafu.build())?
802 .map(|output| {
803 let Output { meta, data } = output;
804 let data = match data {
805 OutputData::Stream(stream) => {
806 OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
807 }
808 other => other,
809 };
810 Output { data, meta }
811 })
812 .map_err(BoxedError::new)
813 .context(ExecuteQuerySnafu)?;
814
815 Ok(interceptor.post_execute(output, query_ctx)?)
816 }
817
818 async fn query_metric_names(
819 &self,
820 matchers: Vec<Matcher>,
821 ctx: &QueryContextRef,
822 ) -> server_error::Result<Vec<String>> {
823 self.handle_query_metric_names(matchers, ctx)
824 .await
825 .map_err(BoxedError::new)
826 .context(ExecuteQuerySnafu)
827 }
828
829 async fn query_label_values(
830 &self,
831 metric: String,
832 label_name: String,
833 matchers: Vec<Matcher>,
834 start: SystemTime,
835 end: SystemTime,
836 ctx: &QueryContextRef,
837 ) -> server_error::Result<Vec<String>> {
838 self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
839 .await
840 .map_err(BoxedError::new)
841 .context(ExecuteQuerySnafu)
842 }
843
844 fn catalog_manager(&self) -> CatalogManagerRef {
845 self.catalog_manager.clone()
846 }
847}
848
849macro_rules! validate_db_permission {
851 ($stmt: expr, $query_ctx: expr) => {
852 if let Some(database) = &$stmt.database {
853 validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
854 .map_err(BoxedError::new)
855 .context(SqlExecInterceptedSnafu)?;
856 }
857 };
858}
859
860pub fn check_permission(
861 plugins: Plugins,
862 stmt: &Statement,
863 query_ctx: &QueryContextRef,
864) -> Result<()> {
865 let need_validate = plugins
866 .get::<QueryOptions>()
867 .map(|opts| opts.disallow_cross_catalog_query)
868 .unwrap_or_default();
869
870 if !need_validate {
871 return Ok(());
872 }
873
874 match stmt {
875 Statement::Admin(_) => {}
878 Statement::Query(_)
880 | Statement::Explain(_)
881 | Statement::Tql(_)
882 | Statement::Delete(_)
883 | Statement::DeclareCursor(_)
884 | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
885 Statement::CreateDatabase(_)
887 | Statement::ShowDatabases(_)
888 | Statement::DropDatabase(_)
889 | Statement::AlterDatabase(_)
890 | Statement::DropFlow(_)
891 | Statement::Use(_) => {}
892 #[cfg(feature = "enterprise")]
893 Statement::DropTrigger(_) => {}
894 Statement::ShowCreateDatabase(stmt) => {
895 validate_database(&stmt.database_name, query_ctx)?;
896 }
897 Statement::ShowCreateTable(stmt) => {
898 validate_param(&stmt.table_name, query_ctx)?;
899 }
900 Statement::ShowCreateFlow(stmt) => {
901 validate_flow(&stmt.flow_name, query_ctx)?;
902 }
903 #[cfg(feature = "enterprise")]
904 Statement::ShowCreateTrigger(stmt) => {
905 validate_param(&stmt.trigger_name, query_ctx)?;
906 }
907 Statement::ShowCreateView(stmt) => {
908 validate_param(&stmt.view_name, query_ctx)?;
909 }
910 Statement::CreateExternalTable(stmt) => {
911 validate_param(&stmt.name, query_ctx)?;
912 }
913 Statement::CreateFlow(stmt) => {
914 validate_param(&stmt.sink_table_name, query_ctx)?;
916 }
917 #[cfg(feature = "enterprise")]
918 Statement::CreateTrigger(stmt) => {
919 validate_param(&stmt.trigger_name, query_ctx)?;
920 }
921 Statement::CreateView(stmt) => {
922 validate_param(&stmt.name, query_ctx)?;
923 }
924 Statement::AlterTable(stmt) => {
925 validate_param(stmt.table_name(), query_ctx)?;
926 }
927 #[cfg(feature = "enterprise")]
928 Statement::AlterTrigger(_) => {}
929 Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
931 Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
933
934 Statement::Comment(comment) => match &comment.object {
935 CommentObject::Table(table) => validate_param(table, query_ctx)?,
936 CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
937 CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
938 },
939
940 Statement::Insert(insert) => {
941 let name = insert.table_name().context(ParseSqlSnafu)?;
942 validate_param(name, query_ctx)?;
943 }
944 Statement::CreateTable(stmt) => {
945 validate_param(&stmt.name, query_ctx)?;
946 }
947 Statement::CreateTableLike(stmt) => {
948 validate_param(&stmt.table_name, query_ctx)?;
949 validate_param(&stmt.source_name, query_ctx)?;
950 }
951 Statement::DropTable(drop_stmt) => {
952 for table_name in drop_stmt.table_names() {
953 validate_param(table_name, query_ctx)?;
954 }
955 }
956 Statement::DropView(stmt) => {
957 validate_param(&stmt.view_name, query_ctx)?;
958 }
959 Statement::ShowTables(stmt) => {
960 validate_db_permission!(stmt, query_ctx);
961 }
962 Statement::ShowTableStatus(stmt) => {
963 validate_db_permission!(stmt, query_ctx);
964 }
965 Statement::ShowColumns(stmt) => {
966 validate_db_permission!(stmt, query_ctx);
967 }
968 Statement::ShowIndex(stmt) => {
969 validate_db_permission!(stmt, query_ctx);
970 }
971 Statement::ShowRegion(stmt) => {
972 validate_db_permission!(stmt, query_ctx);
973 }
974 Statement::ShowViews(stmt) => {
975 validate_db_permission!(stmt, query_ctx);
976 }
977 Statement::ShowFlows(stmt) => {
978 validate_db_permission!(stmt, query_ctx);
979 }
980 #[cfg(feature = "enterprise")]
981 Statement::ShowTriggers(_stmt) => {
982 }
985 Statement::ShowStatus(_stmt) => {}
986 Statement::ShowSearchPath(_stmt) => {}
987 Statement::DescribeTable(stmt) => {
988 validate_param(stmt.name(), query_ctx)?;
989 }
990 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
991 CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?,
992 CopyTable::From(copy_table_from) => {
993 validate_param(©_table_from.table_name, query_ctx)?
994 }
995 },
996 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
997 match copy_database {
998 CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
999 CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1000 }
1001 }
1002 Statement::TruncateTable(stmt) => {
1003 validate_param(stmt.table_name(), query_ctx)?;
1004 }
1005 Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1007 Statement::Kill(_) => {}
1009 Statement::ShowProcesslist(_) => {}
1011 }
1012 Ok(())
1013}
1014
1015fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1016 let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1017 .map_err(BoxedError::new)
1018 .context(ExternalSnafu)?;
1019
1020 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1021 .map_err(BoxedError::new)
1022 .context(SqlExecInterceptedSnafu)
1023}
1024
1025fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1026 let catalog = match &name.0[..] {
1027 [_flow] => query_ctx.current_catalog().to_string(),
1028 [catalog, _flow] => catalog.to_string_unquoted(),
1029 _ => {
1030 return InvalidSqlSnafu {
1031 err_msg: format!(
1032 "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
1033 ),
1034 }
1035 .fail();
1036 }
1037 };
1038
1039 let schema = query_ctx.current_schema();
1040
1041 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1042 .map_err(BoxedError::new)
1043 .context(SqlExecInterceptedSnafu)
1044}
1045
1046fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1047 let (catalog, schema) = match &name.0[..] {
1048 [schema] => (
1049 query_ctx.current_catalog().to_string(),
1050 schema.to_string_unquoted(),
1051 ),
1052 [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1053 _ => InvalidSqlSnafu {
1054 err_msg: format!(
1055 "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1056 ),
1057 }
1058 .fail()?,
1059 };
1060
1061 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1062 .map_err(BoxedError::new)
1063 .context(SqlExecInterceptedSnafu)
1064}
1065
1066fn should_capture_statement(stmt: Option<&Statement>) -> bool {
1068 if let Some(stmt) = stmt {
1069 matches!(stmt, Statement::Query(_)) || stmt.is_readonly()
1070 } else {
1071 false
1072 }
1073}
1074
1075#[cfg(test)]
1076mod tests {
1077 use std::collections::HashMap;
1078 use std::sync::atomic::{AtomicBool, Ordering};
1079 use std::sync::{Arc, Barrier};
1080 use std::thread;
1081 use std::time::{Duration, Instant};
1082
1083 use common_base::Plugins;
1084 use query::query_engine::options::QueryOptions;
1085 use session::context::QueryContext;
1086 use sql::dialect::GreptimeDbDialect;
1087 use strfmt::Format;
1088
1089 use super::*;
1090
1091 #[test]
1092 fn test_fast_legacy_check_deadlock_prevention() {
1093 let cache = DashMap::new();
1095
1096 cache.insert("metric1".to_string(), true); cache.insert("metric2".to_string(), false); cache.insert("metric3".to_string(), true); let metric1 = "metric1".to_string();
1103 let metric4 = "metric4".to_string();
1104 let names1 = vec![&metric1, &metric4];
1105 let result = fast_legacy_check(&cache, &names1);
1106 assert!(result.is_ok());
1107 assert_eq!(result.unwrap(), Some(true)); assert!(cache.contains_key("metric4"));
1111 assert!(*cache.get("metric4").unwrap().value());
1112
1113 let metric5 = "metric5".to_string();
1115 let metric6 = "metric6".to_string();
1116 let names2 = vec![&metric5, &metric6];
1117 let result = fast_legacy_check(&cache, &names2);
1118 assert!(result.is_ok());
1119 assert_eq!(result.unwrap(), None); let cache_incompatible = DashMap::new();
1123 cache_incompatible.insert("metric1".to_string(), true); cache_incompatible.insert("metric2".to_string(), false); let metric1_test = "metric1".to_string();
1126 let metric2_test = "metric2".to_string();
1127 let names3 = vec![&metric1_test, &metric2_test];
1128 let result = fast_legacy_check(&cache_incompatible, &names3);
1129 assert!(result.is_err()); let cache_concurrent = Arc::new(DashMap::new());
1135 cache_concurrent.insert("shared_metric".to_string(), true);
1136
1137 let num_threads = 8;
1138 let operations_per_thread = 100;
1139 let barrier = Arc::new(Barrier::new(num_threads));
1140 let success_flag = Arc::new(AtomicBool::new(true));
1141
1142 let handles: Vec<_> = (0..num_threads)
1143 .map(|thread_id| {
1144 let cache_clone = Arc::clone(&cache_concurrent);
1145 let barrier_clone = Arc::clone(&barrier);
1146 let success_flag_clone = Arc::clone(&success_flag);
1147
1148 thread::spawn(move || {
1149 barrier_clone.wait();
1151
1152 let start_time = Instant::now();
1153 for i in 0..operations_per_thread {
1154 let shared_metric = "shared_metric".to_string();
1156 let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1157 let names = vec![&shared_metric, &new_metric];
1158
1159 match fast_legacy_check(&cache_clone, &names) {
1160 Ok(_) => {}
1161 Err(_) => {
1162 success_flag_clone.store(false, Ordering::Relaxed);
1163 return;
1164 }
1165 }
1166
1167 if start_time.elapsed() > Duration::from_secs(10) {
1169 success_flag_clone.store(false, Ordering::Relaxed);
1170 return;
1171 }
1172 }
1173 })
1174 })
1175 .collect();
1176
1177 let start_time = Instant::now();
1179 for (i, handle) in handles.into_iter().enumerate() {
1180 let join_result = handle.join();
1181
1182 if start_time.elapsed() > Duration::from_secs(30) {
1184 panic!("Test timed out - possible deadlock detected!");
1185 }
1186
1187 if join_result.is_err() {
1188 panic!("Thread {} panicked during execution", i);
1189 }
1190 }
1191
1192 assert!(
1194 success_flag.load(Ordering::Relaxed),
1195 "Some operations failed"
1196 );
1197
1198 let final_count = cache_concurrent.len();
1200 assert!(
1201 final_count > 1 + num_threads * operations_per_thread / 2,
1202 "Expected more cache entries, got {}",
1203 final_count
1204 );
1205 }
1206
1207 #[test]
1208 fn test_exec_validation() {
1209 let query_ctx = QueryContext::arc();
1210 let plugins: Plugins = Plugins::new();
1211 plugins.insert(QueryOptions {
1212 disallow_cross_catalog_query: true,
1213 });
1214
1215 let sql = r#"
1216 SELECT * FROM demo;
1217 EXPLAIN SELECT * FROM demo;
1218 CREATE DATABASE test_database;
1219 SHOW DATABASES;
1220 "#;
1221 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1222 assert_eq!(stmts.len(), 4);
1223 for stmt in stmts {
1224 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1225 re.unwrap();
1226 }
1227
1228 let sql = r#"
1229 SHOW CREATE TABLE demo;
1230 ALTER TABLE demo ADD COLUMN new_col INT;
1231 "#;
1232 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1233 assert_eq!(stmts.len(), 2);
1234 for stmt in stmts {
1235 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1236 re.unwrap();
1237 }
1238
1239 fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1240 let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1242 for (catalog, schema) in right {
1243 let sql = do_fmt(template_sql, catalog, schema);
1244 do_test(&sql, plugins.clone(), query_ctx, true);
1245 }
1246
1247 let wrong = vec![
1248 ("wrongcatalog.", "public."),
1249 ("wrongcatalog.", "wrongschema."),
1250 ];
1251 for (catalog, schema) in wrong {
1252 let sql = do_fmt(template_sql, catalog, schema);
1253 do_test(&sql, plugins.clone(), query_ctx, false);
1254 }
1255 }
1256
1257 fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1258 let vars = HashMap::from([
1259 ("catalog".to_string(), catalog),
1260 ("schema".to_string(), schema),
1261 ]);
1262 template.format(&vars).unwrap()
1263 }
1264
1265 fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1266 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1267 let re = check_permission(plugins, stmt, query_ctx);
1268 if is_ok {
1269 re.unwrap();
1270 } else {
1271 assert!(re.is_err());
1272 }
1273 }
1274
1275 let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1277 replace_test(sql, plugins.clone(), &query_ctx);
1278
1279 let sql = r#"CREATE TABLE {catalog}{schema}demo(
1281 host STRING,
1282 ts TIMESTAMP,
1283 TIME INDEX (ts),
1284 PRIMARY KEY(host)
1285 ) engine=mito;"#;
1286 replace_test(sql, plugins.clone(), &query_ctx);
1287
1288 let sql = "DROP TABLE {catalog}{schema}demo;";
1290 replace_test(sql, plugins.clone(), &query_ctx);
1291
1292 let sql = "SHOW TABLES FROM public";
1294 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1295 check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1296
1297 let sql = "SHOW TABLES FROM private";
1298 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1299 let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1300 assert!(re.is_ok());
1301
1302 let sql = "DESC TABLE {catalog}{schema}demo;";
1304 replace_test(sql, plugins.clone(), &query_ctx);
1305
1306 let comment_flow_cases = [
1307 ("COMMENT ON FLOW my_flow IS 'comment';", true),
1308 ("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
1309 ("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
1310 ];
1311 for (sql, is_ok) in comment_flow_cases {
1312 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1313 let result = check_permission(plugins.clone(), stmt, &query_ctx);
1314 assert_eq!(result.is_ok(), is_ok);
1315 }
1316
1317 let show_flow_cases = [
1318 ("SHOW CREATE FLOW my_flow;", true),
1319 ("SHOW CREATE FLOW greptime.my_flow;", true),
1320 ("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
1321 ];
1322 for (sql, is_ok) in show_flow_cases {
1323 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1324 let result = check_permission(plugins.clone(), stmt, &query_ctx);
1325 assert_eq!(result.is_ok(), is_ok);
1326 }
1327 }
1328}