1pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::pin::Pin;
29use std::sync::Arc;
30use std::time::{Duration, SystemTime};
31
32use async_stream::stream;
33use async_trait::async_trait;
34use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
35use catalog::CatalogManagerRef;
36use catalog::process_manager::{
37 ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
38};
39use client::OutputData;
40use common_base::Plugins;
41use common_base::cancellation::CancellableFuture;
42use common_error::ext::{BoxedError, ErrorExt};
43use common_event_recorder::EventRecorderRef;
44use common_meta::cache_invalidator::CacheInvalidatorRef;
45use common_meta::key::TableMetadataManagerRef;
46use common_meta::key::table_name::TableNameKey;
47use common_meta::node_manager::NodeManagerRef;
48use common_meta::procedure_executor::ProcedureExecutorRef;
49use common_query::Output;
50use common_recordbatch::RecordBatchStreamWrapper;
51use common_recordbatch::error::StreamTimeoutSnafu;
52use common_telemetry::logging::SlowQueryOptions;
53use common_telemetry::{debug, error, tracing};
54use dashmap::DashMap;
55use datafusion_expr::LogicalPlan;
56use futures::{Stream, StreamExt};
57use lazy_static::lazy_static;
58use operator::delete::DeleterRef;
59use operator::insert::InserterRef;
60use operator::statement::{StatementExecutor, StatementExecutorRef};
61use partition::manager::PartitionRuleManagerRef;
62use pipeline::pipeline_operator::PipelineOperator;
63use prometheus::HistogramTimer;
64use promql_parser::label::Matcher;
65use query::QueryEngineRef;
66use query::metrics::OnDone;
67use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
68use query::query_engine::DescribeResult;
69use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
70use servers::error::{
71 self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
72 OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
73};
74use servers::interceptor::{
75 PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
76};
77use servers::otlp::metrics::legacy_normalize_otlp_name;
78use servers::prometheus_handler::PrometheusHandler;
79use servers::query_handler::sql::SqlQueryHandler;
80use session::context::{Channel, QueryContextRef};
81use session::table_name::table_idents_to_full_name;
82use snafu::prelude::*;
83use sql::ast::ObjectNamePartExt;
84use sql::dialect::Dialect;
85use sql::parser::{ParseOptions, ParserContext};
86use sql::statements::copy::{CopyDatabase, CopyTable};
87use sql::statements::statement::Statement;
88use sql::statements::tql::Tql;
89use sqlparser::ast::ObjectName;
90pub use standalone::StandaloneDatanodeManager;
91use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
92
93use crate::error::{
94 self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
95 ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
96 StatementTimeoutSnafu, TableOperationSnafu,
97};
98use crate::limiter::LimiterRef;
99use crate::stream_wrapper::CancellableStreamWrapper;
100
101lazy_static! {
102 static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
103}
104
105#[derive(Clone)]
109pub struct Instance {
110 catalog_manager: CatalogManagerRef,
111 pipeline_operator: Arc<PipelineOperator>,
112 statement_executor: Arc<StatementExecutor>,
113 query_engine: QueryEngineRef,
114 plugins: Plugins,
115 inserter: InserterRef,
116 deleter: DeleterRef,
117 table_metadata_manager: TableMetadataManagerRef,
118 event_recorder: Option<EventRecorderRef>,
119 limiter: Option<LimiterRef>,
120 process_manager: ProcessManagerRef,
121 slow_query_options: SlowQueryOptions,
122
123 otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
128}
129
130impl Instance {
131 pub fn catalog_manager(&self) -> &CatalogManagerRef {
132 &self.catalog_manager
133 }
134
135 pub fn query_engine(&self) -> &QueryEngineRef {
136 &self.query_engine
137 }
138
139 pub fn plugins(&self) -> &Plugins {
140 &self.plugins
141 }
142
143 pub fn statement_executor(&self) -> &StatementExecutorRef {
144 &self.statement_executor
145 }
146
147 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
148 &self.table_metadata_manager
149 }
150
151 pub fn inserter(&self) -> &InserterRef {
152 &self.inserter
153 }
154
155 pub fn process_manager(&self) -> &ProcessManagerRef {
156 &self.process_manager
157 }
158
159 pub fn node_manager(&self) -> &NodeManagerRef {
160 self.inserter.node_manager()
161 }
162
163 pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
164 self.inserter.partition_manager()
165 }
166
167 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
168 self.statement_executor.cache_invalidator()
169 }
170
171 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
172 self.statement_executor.procedure_executor()
173 }
174}
175
176fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
177 ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
178}
179
180impl Instance {
181 async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
182 check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
183
184 let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
185 let query_interceptor = query_interceptor.as_ref();
186
187 if should_capture_statement(Some(&stmt)) {
188 let slow_query_timer = self
189 .slow_query_options
190 .enable
191 .then(|| self.event_recorder.clone())
192 .flatten()
193 .map(|event_recorder| {
194 SlowQueryTimer::new(
195 CatalogQueryStatement::Sql(stmt.clone()),
196 self.slow_query_options.threshold,
197 self.slow_query_options.sample_ratio,
198 self.slow_query_options.record_type,
199 event_recorder,
200 )
201 });
202
203 let ticket = self.process_manager.register_query(
204 query_ctx.current_catalog().to_string(),
205 vec![query_ctx.current_schema()],
206 stmt.to_string(),
207 query_ctx.conn_info().to_string(),
208 Some(query_ctx.process_id()),
209 slow_query_timer,
210 );
211
212 let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
213
214 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
215 .await
216 .map_err(|_| error::CancelledSnafu.build())?
217 .map(|output| {
218 let Output { meta, data } = output;
219
220 let data = match data {
221 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
222 CancellableStreamWrapper::new(stream, ticket),
223 )),
224 other => other,
225 };
226 Output { data, meta }
227 })
228 } else {
229 self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
230 .await
231 }
232 }
233
234 async fn exec_statement_with_timeout(
235 &self,
236 stmt: Statement,
237 query_ctx: QueryContextRef,
238 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
239 ) -> Result<Output> {
240 let timeout = derive_timeout(&stmt, &query_ctx);
241 match timeout {
242 Some(timeout) => {
243 let start = tokio::time::Instant::now();
244 let output = tokio::time::timeout(
245 timeout,
246 self.exec_statement(stmt, query_ctx, query_interceptor),
247 )
248 .await
249 .map_err(|_| StatementTimeoutSnafu.build())??;
250 let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
252 attach_timeout(output, remaining_timeout)
253 }
254 None => {
255 self.exec_statement(stmt, query_ctx, query_interceptor)
256 .await
257 }
258 }
259 }
260
261 async fn exec_statement(
262 &self,
263 stmt: Statement,
264 query_ctx: QueryContextRef,
265 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
266 ) -> Result<Output> {
267 match stmt {
268 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
269 if let Statement::Explain(explain) = &stmt
271 && let Some(format) = explain.format()
272 {
273 query_ctx.set_explain_format(format.to_string());
274 }
275
276 self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
277 .await
278 }
279 Statement::Tql(tql) => {
280 self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
281 .await
282 }
283 _ => {
284 query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
285 self.statement_executor
286 .execute_sql(stmt, query_ctx)
287 .await
288 .context(TableOperationSnafu)
289 }
290 }
291 }
292
293 async fn plan_and_exec_sql(
294 &self,
295 stmt: Statement,
296 query_ctx: &QueryContextRef,
297 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
298 ) -> Result<Output> {
299 let stmt = QueryStatement::Sql(stmt);
300 let plan = self
301 .statement_executor
302 .plan(&stmt, query_ctx.clone())
303 .await?;
304 let QueryStatement::Sql(stmt) = stmt else {
305 unreachable!()
306 };
307 query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
308 self.statement_executor
309 .exec_plan(plan, query_ctx.clone())
310 .await
311 .context(TableOperationSnafu)
312 }
313
314 async fn plan_and_exec_tql(
315 &self,
316 query_ctx: &QueryContextRef,
317 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
318 tql: Tql,
319 ) -> Result<Output> {
320 let plan = self
321 .statement_executor
322 .plan_tql(tql.clone(), query_ctx)
323 .await?;
324 query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
325 self.statement_executor
326 .exec_plan(plan, query_ctx.clone())
327 .await
328 .context(TableOperationSnafu)
329 }
330
331 async fn check_otlp_legacy(
332 &self,
333 names: &[&String],
334 ctx: QueryContextRef,
335 ) -> server_error::Result<bool> {
336 let db_string = ctx.get_db_string();
337 let cache = self
339 .otlp_metrics_table_legacy_cache
340 .entry(db_string.clone())
341 .or_default();
342 if let Some(flag) = fast_legacy_check(&cache, names)? {
343 return Ok(flag);
344 }
345 drop(cache);
347
348 let catalog = ctx.current_catalog();
349 let schema = ctx.current_schema();
350
351 let normalized_names = names
353 .iter()
354 .map(|n| legacy_normalize_otlp_name(n))
355 .collect::<Vec<_>>();
356 let table_names = normalized_names
357 .iter()
358 .map(|n| TableNameKey::new(catalog, &schema, n))
359 .collect::<Vec<_>>();
360 let table_values = self
361 .table_metadata_manager()
362 .table_name_manager()
363 .batch_get(table_names)
364 .await
365 .context(CommonMetaSnafu)?;
366 let table_ids = table_values
367 .into_iter()
368 .filter_map(|v| v.map(|vi| vi.table_id()))
369 .collect::<Vec<_>>();
370
371 if table_ids.is_empty() {
373 let cache = self
374 .otlp_metrics_table_legacy_cache
375 .entry(db_string)
376 .or_default();
377 names.iter().for_each(|name| {
378 cache.insert((*name).clone(), false);
379 });
380 return Ok(false);
381 }
382
383 let table_infos = self
385 .table_metadata_manager()
386 .table_info_manager()
387 .batch_get(&table_ids)
388 .await
389 .context(CommonMetaSnafu)?;
390 let options = table_infos
391 .values()
392 .map(|info| {
393 info.table_info
394 .meta
395 .options
396 .extra_options
397 .get(OTLP_METRIC_COMPAT_KEY)
398 .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
399 })
400 .collect::<Vec<_>>();
401 let cache = self
402 .otlp_metrics_table_legacy_cache
403 .entry(db_string)
404 .or_default();
405 if !options.is_empty() {
406 let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
408 let has_legacy = options
409 .iter()
410 .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
411 ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
412 let flag = has_legacy;
413 names.iter().for_each(|name| {
414 cache.insert((*name).clone(), flag);
415 });
416 Ok(flag)
417 } else {
418 names.iter().for_each(|name| {
420 cache.insert((*name).clone(), false);
421 });
422 Ok(false)
423 }
424 }
425}
426
427fn fast_legacy_check(
428 cache: &DashMap<String, bool>,
429 names: &[&String],
430) -> server_error::Result<Option<bool>> {
431 let hit_cache = names
432 .iter()
433 .filter_map(|name| cache.get(*name))
434 .collect::<Vec<_>>();
435 if !hit_cache.is_empty() {
436 let hit_legacy = hit_cache.iter().any(|en| *en.value());
437 let hit_prom = hit_cache.iter().any(|en| !*en.value());
438
439 ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
443
444 let flag = hit_legacy;
445 drop(hit_cache);
447
448 names.iter().for_each(|name| {
450 if !cache.contains_key(*name) {
451 cache.insert((*name).clone(), flag);
452 }
453 });
454 Ok(Some(flag))
455 } else {
456 Ok(None)
457 }
458}
459
460fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
463 let query_timeout = query_ctx.query_timeout()?;
464 if query_timeout.is_zero() {
465 return None;
466 }
467 match query_ctx.channel() {
468 Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
469 Channel::Postgres => Some(query_timeout),
470 _ => None,
471 }
472}
473
474fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
475 if timeout.is_zero() {
476 return StatementTimeoutSnafu.fail();
477 }
478
479 let output = match output.data {
480 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
481 OutputData::Stream(mut stream) => {
482 let schema = stream.schema();
483 let s = Box::pin(stream! {
484 let mut start = tokio::time::Instant::now();
485 while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
486 yield item;
487
488 let now = tokio::time::Instant::now();
489 timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
490 start = now;
491 if timeout.is_zero() {
493 StreamTimeoutSnafu.fail()?;
494 }
495 }
496 }) as Pin<Box<dyn Stream<Item = _> + Send>>;
497 let stream = RecordBatchStreamWrapper {
498 schema,
499 stream: s,
500 output_ordering: None,
501 metrics: Default::default(),
502 };
503 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
504 }
505 };
506
507 Ok(output)
508}
509
510#[async_trait]
511impl SqlQueryHandler for Instance {
512 type Error = Error;
513
514 #[tracing::instrument(skip_all)]
515 async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
516 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
517 let query_interceptor = query_interceptor_opt.as_ref();
518 let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
519 Ok(q) => q,
520 Err(e) => return vec![Err(e)],
521 };
522
523 let checker_ref = self.plugins.get::<PermissionCheckerRef>();
524 let checker = checker_ref.as_ref();
525
526 match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
527 .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
528 {
529 Ok(stmts) => {
530 if stmts.is_empty() {
531 return vec![
532 InvalidSqlSnafu {
533 err_msg: "empty statements",
534 }
535 .fail(),
536 ];
537 }
538
539 let mut results = Vec::with_capacity(stmts.len());
540 for stmt in stmts {
541 if let Err(e) = checker
542 .check_permission(
543 query_ctx.current_user(),
544 PermissionReq::SqlStatement(&stmt),
545 )
546 .context(PermissionSnafu)
547 {
548 results.push(Err(e));
549 break;
550 }
551
552 match self.query_statement(stmt.clone(), query_ctx.clone()).await {
553 Ok(output) => {
554 let output_result =
555 query_interceptor.post_execute(output, query_ctx.clone());
556 results.push(output_result);
557 }
558 Err(e) => {
559 if e.status_code().should_log_error() {
560 error!(e; "Failed to execute query: {stmt}");
561 } else {
562 debug!("Failed to execute query: {stmt}, {e}");
563 }
564 results.push(Err(e));
565 break;
566 }
567 }
568 }
569 results
570 }
571 Err(e) => {
572 vec![Err(e)]
573 }
574 }
575 }
576
577 async fn do_exec_plan(
578 &self,
579 stmt: Option<Statement>,
580 plan: LogicalPlan,
581 query_ctx: QueryContextRef,
582 ) -> Result<Output> {
583 if should_capture_statement(stmt.as_ref()) {
584 let stmt = stmt.unwrap();
586 let query = stmt.to_string();
587 let slow_query_timer = self
588 .slow_query_options
589 .enable
590 .then(|| self.event_recorder.clone())
591 .flatten()
592 .map(|event_recorder| {
593 SlowQueryTimer::new(
594 CatalogQueryStatement::Sql(stmt.clone()),
595 self.slow_query_options.threshold,
596 self.slow_query_options.sample_ratio,
597 self.slow_query_options.record_type,
598 event_recorder,
599 )
600 });
601
602 let ticket = self.process_manager.register_query(
603 query_ctx.current_catalog().to_string(),
604 vec![query_ctx.current_schema()],
605 query,
606 query_ctx.conn_info().to_string(),
607 Some(query_ctx.process_id()),
608 slow_query_timer,
609 );
610
611 let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
612
613 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
614 .await
615 .map_err(|_| error::CancelledSnafu.build())?
616 .map(|output| {
617 let Output { meta, data } = output;
618
619 let data = match data {
620 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
621 CancellableStreamWrapper::new(stream, ticket),
622 )),
623 other => other,
624 };
625 Output { data, meta }
626 })
627 .context(ExecLogicalPlanSnafu)
628 } else {
629 self.query_engine
632 .execute(plan.clone(), query_ctx)
633 .await
634 .context(ExecLogicalPlanSnafu)
635 }
636 }
637
638 #[tracing::instrument(skip_all)]
639 async fn do_promql_query(
640 &self,
641 query: &PromQuery,
642 query_ctx: QueryContextRef,
643 ) -> Vec<Result<Output>> {
644 let result = PrometheusHandler::do_query(self, query, query_ctx)
646 .await
647 .with_context(|_| ExecutePromqlSnafu {
648 query: format!("{query:?}"),
649 });
650 vec![result]
651 }
652
653 async fn do_describe(
654 &self,
655 stmt: Statement,
656 query_ctx: QueryContextRef,
657 ) -> Result<Option<DescribeResult>> {
658 if matches!(
659 stmt,
660 Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
661 ) {
662 self.plugins
663 .get::<PermissionCheckerRef>()
664 .as_ref()
665 .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
666 .context(PermissionSnafu)?;
667
668 let plan = self
669 .query_engine
670 .planner()
671 .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
672 .await
673 .context(PlanStatementSnafu)?;
674 self.query_engine
675 .describe(plan, query_ctx)
676 .await
677 .map(Some)
678 .context(error::DescribeStatementSnafu)
679 } else {
680 Ok(None)
681 }
682 }
683
684 async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
685 self.catalog_manager
686 .schema_exists(catalog, schema, None)
687 .await
688 .context(error::CatalogSnafu)
689 }
690}
691
692pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
694 match output.data {
695 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
696 OutputData::Stream(stream) => {
697 let stream = OnDone::new(stream, move || {
698 timer.observe_duration();
699 });
700 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
701 }
702 }
703}
704
705#[async_trait]
706impl PrometheusHandler for Instance {
707 #[tracing::instrument(skip_all)]
708 async fn do_query(
709 &self,
710 query: &PromQuery,
711 query_ctx: QueryContextRef,
712 ) -> server_error::Result<Output> {
713 let interceptor = self
714 .plugins
715 .get::<PromQueryInterceptorRef<server_error::Error>>();
716
717 self.plugins
718 .get::<PermissionCheckerRef>()
719 .as_ref()
720 .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
721 .context(AuthSnafu)?;
722
723 let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
724 ParsePromQLSnafu {
725 query: query.clone(),
726 }
727 })?;
728
729 let plan = self
730 .statement_executor
731 .plan(&stmt, query_ctx.clone())
732 .await
733 .map_err(BoxedError::new)
734 .context(ExecuteQuerySnafu)?;
735
736 interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
737
738 let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
740 CatalogQueryStatement::Promql(eval_stmt, alias)
741 } else {
742 return UnexpectedResultSnafu {
744 reason: "The query should always be promql.".to_string(),
745 }
746 .fail();
747 };
748 let query = query_statement.to_string();
749
750 let slow_query_timer = self
751 .slow_query_options
752 .enable
753 .then(|| self.event_recorder.clone())
754 .flatten()
755 .map(|event_recorder| {
756 SlowQueryTimer::new(
757 query_statement,
758 self.slow_query_options.threshold,
759 self.slow_query_options.sample_ratio,
760 self.slow_query_options.record_type,
761 event_recorder,
762 )
763 });
764
765 let ticket = self.process_manager.register_query(
766 query_ctx.current_catalog().to_string(),
767 vec![query_ctx.current_schema()],
768 query,
769 query_ctx.conn_info().to_string(),
770 Some(query_ctx.process_id()),
771 slow_query_timer,
772 );
773
774 let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
775
776 let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
777 .await
778 .map_err(|_| servers::error::CancelledSnafu.build())?
779 .map(|output| {
780 let Output { meta, data } = output;
781 let data = match data {
782 OutputData::Stream(stream) => {
783 OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
784 }
785 other => other,
786 };
787 Output { data, meta }
788 })
789 .map_err(BoxedError::new)
790 .context(ExecuteQuerySnafu)?;
791
792 Ok(interceptor.post_execute(output, query_ctx)?)
793 }
794
795 async fn query_metric_names(
796 &self,
797 matchers: Vec<Matcher>,
798 ctx: &QueryContextRef,
799 ) -> server_error::Result<Vec<String>> {
800 self.handle_query_metric_names(matchers, ctx)
801 .await
802 .map_err(BoxedError::new)
803 .context(ExecuteQuerySnafu)
804 }
805
806 async fn query_label_values(
807 &self,
808 metric: String,
809 label_name: String,
810 matchers: Vec<Matcher>,
811 start: SystemTime,
812 end: SystemTime,
813 ctx: &QueryContextRef,
814 ) -> server_error::Result<Vec<String>> {
815 self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
816 .await
817 .map_err(BoxedError::new)
818 .context(ExecuteQuerySnafu)
819 }
820
821 fn catalog_manager(&self) -> CatalogManagerRef {
822 self.catalog_manager.clone()
823 }
824}
825
826macro_rules! validate_db_permission {
828 ($stmt: expr, $query_ctx: expr) => {
829 if let Some(database) = &$stmt.database {
830 validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
831 .map_err(BoxedError::new)
832 .context(SqlExecInterceptedSnafu)?;
833 }
834 };
835}
836
837pub fn check_permission(
838 plugins: Plugins,
839 stmt: &Statement,
840 query_ctx: &QueryContextRef,
841) -> Result<()> {
842 let need_validate = plugins
843 .get::<QueryOptions>()
844 .map(|opts| opts.disallow_cross_catalog_query)
845 .unwrap_or_default();
846
847 if !need_validate {
848 return Ok(());
849 }
850
851 match stmt {
852 Statement::Admin(_) => {}
855 Statement::Query(_)
857 | Statement::Explain(_)
858 | Statement::Tql(_)
859 | Statement::Delete(_)
860 | Statement::DeclareCursor(_)
861 | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
862 Statement::CreateDatabase(_)
864 | Statement::ShowDatabases(_)
865 | Statement::DropDatabase(_)
866 | Statement::AlterDatabase(_)
867 | Statement::DropFlow(_)
868 | Statement::Use(_) => {}
869 #[cfg(feature = "enterprise")]
870 Statement::DropTrigger(_) => {}
871 Statement::ShowCreateDatabase(stmt) => {
872 validate_database(&stmt.database_name, query_ctx)?;
873 }
874 Statement::ShowCreateTable(stmt) => {
875 validate_param(&stmt.table_name, query_ctx)?;
876 }
877 Statement::ShowCreateFlow(stmt) => {
878 validate_param(&stmt.flow_name, query_ctx)?;
879 }
880 #[cfg(feature = "enterprise")]
881 Statement::ShowCreateTrigger(stmt) => {
882 validate_param(&stmt.trigger_name, query_ctx)?;
883 }
884 Statement::ShowCreateView(stmt) => {
885 validate_param(&stmt.view_name, query_ctx)?;
886 }
887 Statement::CreateExternalTable(stmt) => {
888 validate_param(&stmt.name, query_ctx)?;
889 }
890 Statement::CreateFlow(stmt) => {
891 validate_param(&stmt.sink_table_name, query_ctx)?;
893 }
894 #[cfg(feature = "enterprise")]
895 Statement::CreateTrigger(stmt) => {
896 validate_param(&stmt.trigger_name, query_ctx)?;
897 }
898 Statement::CreateView(stmt) => {
899 validate_param(&stmt.name, query_ctx)?;
900 }
901 Statement::AlterTable(stmt) => {
902 validate_param(stmt.table_name(), query_ctx)?;
903 }
904 #[cfg(feature = "enterprise")]
905 Statement::AlterTrigger(_) => {}
906 Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
908 Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
910
911 Statement::Insert(insert) => {
912 let name = insert.table_name().context(ParseSqlSnafu)?;
913 validate_param(name, query_ctx)?;
914 }
915 Statement::CreateTable(stmt) => {
916 validate_param(&stmt.name, query_ctx)?;
917 }
918 Statement::CreateTableLike(stmt) => {
919 validate_param(&stmt.table_name, query_ctx)?;
920 validate_param(&stmt.source_name, query_ctx)?;
921 }
922 Statement::DropTable(drop_stmt) => {
923 for table_name in drop_stmt.table_names() {
924 validate_param(table_name, query_ctx)?;
925 }
926 }
927 Statement::DropView(stmt) => {
928 validate_param(&stmt.view_name, query_ctx)?;
929 }
930 Statement::ShowTables(stmt) => {
931 validate_db_permission!(stmt, query_ctx);
932 }
933 Statement::ShowTableStatus(stmt) => {
934 validate_db_permission!(stmt, query_ctx);
935 }
936 Statement::ShowColumns(stmt) => {
937 validate_db_permission!(stmt, query_ctx);
938 }
939 Statement::ShowIndex(stmt) => {
940 validate_db_permission!(stmt, query_ctx);
941 }
942 Statement::ShowRegion(stmt) => {
943 validate_db_permission!(stmt, query_ctx);
944 }
945 Statement::ShowViews(stmt) => {
946 validate_db_permission!(stmt, query_ctx);
947 }
948 Statement::ShowFlows(stmt) => {
949 validate_db_permission!(stmt, query_ctx);
950 }
951 #[cfg(feature = "enterprise")]
952 Statement::ShowTriggers(_stmt) => {
953 }
956 Statement::ShowStatus(_stmt) => {}
957 Statement::ShowSearchPath(_stmt) => {}
958 Statement::DescribeTable(stmt) => {
959 validate_param(stmt.name(), query_ctx)?;
960 }
961 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
962 CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?,
963 CopyTable::From(copy_table_from) => {
964 validate_param(©_table_from.table_name, query_ctx)?
965 }
966 },
967 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
968 match copy_database {
969 CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
970 CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
971 }
972 }
973 Statement::TruncateTable(stmt) => {
974 validate_param(stmt.table_name(), query_ctx)?;
975 }
976 Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
978 Statement::Kill(_) => {}
980 Statement::ShowProcesslist(_) => {}
982 }
983 Ok(())
984}
985
986fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
987 let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
988 .map_err(BoxedError::new)
989 .context(ExternalSnafu)?;
990
991 validate_catalog_and_schema(&catalog, &schema, query_ctx)
992 .map_err(BoxedError::new)
993 .context(SqlExecInterceptedSnafu)
994}
995
996fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
997 let (catalog, schema) = match &name.0[..] {
998 [schema] => (
999 query_ctx.current_catalog().to_string(),
1000 schema.to_string_unquoted(),
1001 ),
1002 [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1003 _ => InvalidSqlSnafu {
1004 err_msg: format!(
1005 "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1006 ),
1007 }
1008 .fail()?,
1009 };
1010
1011 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1012 .map_err(BoxedError::new)
1013 .context(SqlExecInterceptedSnafu)
1014}
1015
1016fn should_capture_statement(stmt: Option<&Statement>) -> bool {
1018 if let Some(stmt) = stmt {
1019 matches!(stmt, Statement::Query(_)) || stmt.is_readonly()
1020 } else {
1021 false
1022 }
1023}
1024
1025#[cfg(test)]
1026mod tests {
1027 use std::collections::HashMap;
1028 use std::sync::atomic::{AtomicBool, Ordering};
1029 use std::sync::{Arc, Barrier};
1030 use std::thread;
1031 use std::time::{Duration, Instant};
1032
1033 use common_base::Plugins;
1034 use query::query_engine::options::QueryOptions;
1035 use session::context::QueryContext;
1036 use sql::dialect::GreptimeDbDialect;
1037 use strfmt::Format;
1038
1039 use super::*;
1040
1041 #[test]
1042 fn test_fast_legacy_check_deadlock_prevention() {
1043 let cache = DashMap::new();
1045
1046 cache.insert("metric1".to_string(), true); cache.insert("metric2".to_string(), false); cache.insert("metric3".to_string(), true); let metric1 = "metric1".to_string();
1053 let metric4 = "metric4".to_string();
1054 let names1 = vec![&metric1, &metric4];
1055 let result = fast_legacy_check(&cache, &names1);
1056 assert!(result.is_ok());
1057 assert_eq!(result.unwrap(), Some(true)); assert!(cache.contains_key("metric4"));
1061 assert!(*cache.get("metric4").unwrap().value());
1062
1063 let metric5 = "metric5".to_string();
1065 let metric6 = "metric6".to_string();
1066 let names2 = vec![&metric5, &metric6];
1067 let result = fast_legacy_check(&cache, &names2);
1068 assert!(result.is_ok());
1069 assert_eq!(result.unwrap(), None); let cache_incompatible = DashMap::new();
1073 cache_incompatible.insert("metric1".to_string(), true); cache_incompatible.insert("metric2".to_string(), false); let metric1_test = "metric1".to_string();
1076 let metric2_test = "metric2".to_string();
1077 let names3 = vec![&metric1_test, &metric2_test];
1078 let result = fast_legacy_check(&cache_incompatible, &names3);
1079 assert!(result.is_err()); let cache_concurrent = Arc::new(DashMap::new());
1085 cache_concurrent.insert("shared_metric".to_string(), true);
1086
1087 let num_threads = 8;
1088 let operations_per_thread = 100;
1089 let barrier = Arc::new(Barrier::new(num_threads));
1090 let success_flag = Arc::new(AtomicBool::new(true));
1091
1092 let handles: Vec<_> = (0..num_threads)
1093 .map(|thread_id| {
1094 let cache_clone = Arc::clone(&cache_concurrent);
1095 let barrier_clone = Arc::clone(&barrier);
1096 let success_flag_clone = Arc::clone(&success_flag);
1097
1098 thread::spawn(move || {
1099 barrier_clone.wait();
1101
1102 let start_time = Instant::now();
1103 for i in 0..operations_per_thread {
1104 let shared_metric = "shared_metric".to_string();
1106 let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1107 let names = vec![&shared_metric, &new_metric];
1108
1109 match fast_legacy_check(&cache_clone, &names) {
1110 Ok(_) => {}
1111 Err(_) => {
1112 success_flag_clone.store(false, Ordering::Relaxed);
1113 return;
1114 }
1115 }
1116
1117 if start_time.elapsed() > Duration::from_secs(10) {
1119 success_flag_clone.store(false, Ordering::Relaxed);
1120 return;
1121 }
1122 }
1123 })
1124 })
1125 .collect();
1126
1127 let start_time = Instant::now();
1129 for (i, handle) in handles.into_iter().enumerate() {
1130 let join_result = handle.join();
1131
1132 if start_time.elapsed() > Duration::from_secs(30) {
1134 panic!("Test timed out - possible deadlock detected!");
1135 }
1136
1137 if join_result.is_err() {
1138 panic!("Thread {} panicked during execution", i);
1139 }
1140 }
1141
1142 assert!(
1144 success_flag.load(Ordering::Relaxed),
1145 "Some operations failed"
1146 );
1147
1148 let final_count = cache_concurrent.len();
1150 assert!(
1151 final_count > 1 + num_threads * operations_per_thread / 2,
1152 "Expected more cache entries, got {}",
1153 final_count
1154 );
1155 }
1156
1157 #[test]
1158 fn test_exec_validation() {
1159 let query_ctx = QueryContext::arc();
1160 let plugins: Plugins = Plugins::new();
1161 plugins.insert(QueryOptions {
1162 disallow_cross_catalog_query: true,
1163 });
1164
1165 let sql = r#"
1166 SELECT * FROM demo;
1167 EXPLAIN SELECT * FROM demo;
1168 CREATE DATABASE test_database;
1169 SHOW DATABASES;
1170 "#;
1171 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1172 assert_eq!(stmts.len(), 4);
1173 for stmt in stmts {
1174 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1175 re.unwrap();
1176 }
1177
1178 let sql = r#"
1179 SHOW CREATE TABLE demo;
1180 ALTER TABLE demo ADD COLUMN new_col INT;
1181 "#;
1182 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1183 assert_eq!(stmts.len(), 2);
1184 for stmt in stmts {
1185 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1186 re.unwrap();
1187 }
1188
1189 fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1190 let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1192 for (catalog, schema) in right {
1193 let sql = do_fmt(template_sql, catalog, schema);
1194 do_test(&sql, plugins.clone(), query_ctx, true);
1195 }
1196
1197 let wrong = vec![
1198 ("wrongcatalog.", "public."),
1199 ("wrongcatalog.", "wrongschema."),
1200 ];
1201 for (catalog, schema) in wrong {
1202 let sql = do_fmt(template_sql, catalog, schema);
1203 do_test(&sql, plugins.clone(), query_ctx, false);
1204 }
1205 }
1206
1207 fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1208 let vars = HashMap::from([
1209 ("catalog".to_string(), catalog),
1210 ("schema".to_string(), schema),
1211 ]);
1212 template.format(&vars).unwrap()
1213 }
1214
1215 fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1216 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1217 let re = check_permission(plugins, stmt, query_ctx);
1218 if is_ok {
1219 re.unwrap();
1220 } else {
1221 assert!(re.is_err());
1222 }
1223 }
1224
1225 let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1227 replace_test(sql, plugins.clone(), &query_ctx);
1228
1229 let sql = r#"CREATE TABLE {catalog}{schema}demo(
1231 host STRING,
1232 ts TIMESTAMP,
1233 TIME INDEX (ts),
1234 PRIMARY KEY(host)
1235 ) engine=mito;"#;
1236 replace_test(sql, plugins.clone(), &query_ctx);
1237
1238 let sql = "DROP TABLE {catalog}{schema}demo;";
1240 replace_test(sql, plugins.clone(), &query_ctx);
1241
1242 let sql = "SHOW TABLES FROM public";
1244 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1245 check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1246
1247 let sql = "SHOW TABLES FROM private";
1248 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1249 let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1250 assert!(re.is_ok());
1251
1252 let sql = "DESC TABLE {catalog}{schema}demo;";
1254 replace_test(sql, plugins, &query_ctx);
1255 }
1256}