1pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::pin::Pin;
29use std::sync::Arc;
30use std::time::{Duration, SystemTime};
31
32use async_stream::stream;
33use async_trait::async_trait;
34use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
35use catalog::CatalogManagerRef;
36use catalog::process_manager::{
37 ProcessManagerRef, QueryStatement as CatalogQueryStatement, SlowQueryTimer,
38};
39use client::OutputData;
40use common_base::Plugins;
41use common_base::cancellation::CancellableFuture;
42use common_config::KvBackendConfig;
43use common_error::ext::{BoxedError, ErrorExt};
44use common_event_recorder::EventRecorderRef;
45use common_meta::cache_invalidator::CacheInvalidatorRef;
46use common_meta::key::TableMetadataManagerRef;
47use common_meta::key::runtime_switch::RuntimeSwitchManager;
48use common_meta::key::table_name::TableNameKey;
49use common_meta::kv_backend::KvBackendRef;
50use common_meta::node_manager::NodeManagerRef;
51use common_meta::procedure_executor::ProcedureExecutorRef;
52use common_meta::state_store::KvStateStore;
53use common_procedure::ProcedureManagerRef;
54use common_procedure::local::{LocalManager, ManagerConfig};
55use common_procedure::options::ProcedureConfig;
56use common_query::Output;
57use common_recordbatch::RecordBatchStreamWrapper;
58use common_recordbatch::error::StreamTimeoutSnafu;
59use common_telemetry::logging::SlowQueryOptions;
60use common_telemetry::{debug, error, info, tracing};
61use dashmap::DashMap;
62use datafusion_expr::LogicalPlan;
63use futures::{Stream, StreamExt};
64use lazy_static::lazy_static;
65use log_store::raft_engine::RaftEngineBackend;
66use operator::delete::DeleterRef;
67use operator::insert::InserterRef;
68use operator::statement::{StatementExecutor, StatementExecutorRef};
69use partition::manager::PartitionRuleManagerRef;
70use pipeline::pipeline_operator::PipelineOperator;
71use prometheus::HistogramTimer;
72use promql_parser::label::Matcher;
73use query::QueryEngineRef;
74use query::metrics::OnDone;
75use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
76use query::query_engine::DescribeResult;
77use query::query_engine::options::{QueryOptions, validate_catalog_and_schema};
78use servers::error::{
79 self as server_error, AuthSnafu, CommonMetaSnafu, ExecuteQuerySnafu,
80 OtlpMetricModeIncompatibleSnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
81};
82use servers::interceptor::{
83 PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
84};
85use servers::otlp::metrics::legacy_normalize_otlp_name;
86use servers::prometheus_handler::PrometheusHandler;
87use servers::query_handler::sql::SqlQueryHandler;
88use session::context::{Channel, QueryContextRef};
89use session::table_name::table_idents_to_full_name;
90use snafu::prelude::*;
91use sql::ast::ObjectNamePartExt;
92use sql::dialect::Dialect;
93use sql::parser::{ParseOptions, ParserContext};
94use sql::statements::copy::{CopyDatabase, CopyTable};
95use sql::statements::statement::Statement;
96use sql::statements::tql::Tql;
97use sqlparser::ast::ObjectName;
98pub use standalone::StandaloneDatanodeManager;
99use table::requests::{OTLP_METRIC_COMPAT_KEY, OTLP_METRIC_COMPAT_PROM};
100
101use crate::error::{
102 self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
103 ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
104 StatementTimeoutSnafu, TableOperationSnafu,
105};
106use crate::limiter::LimiterRef;
107use crate::stream_wrapper::CancellableStreamWrapper;
108
109lazy_static! {
110 static ref OTLP_LEGACY_DEFAULT_VALUE: String = "legacy".to_string();
111}
112
113#[derive(Clone)]
117pub struct Instance {
118 catalog_manager: CatalogManagerRef,
119 pipeline_operator: Arc<PipelineOperator>,
120 statement_executor: Arc<StatementExecutor>,
121 query_engine: QueryEngineRef,
122 plugins: Plugins,
123 inserter: InserterRef,
124 deleter: DeleterRef,
125 table_metadata_manager: TableMetadataManagerRef,
126 event_recorder: Option<EventRecorderRef>,
127 limiter: Option<LimiterRef>,
128 process_manager: ProcessManagerRef,
129 slow_query_options: SlowQueryOptions,
130
131 otlp_metrics_table_legacy_cache: DashMap<String, DashMap<String, bool>>,
136}
137
138impl Instance {
139 pub async fn try_build_standalone_components(
140 dir: String,
141 kv_backend_config: KvBackendConfig,
142 procedure_config: ProcedureConfig,
143 ) -> Result<(KvBackendRef, ProcedureManagerRef)> {
144 info!(
145 "Creating metadata kvbackend with config: {:?}",
146 kv_backend_config
147 );
148 let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
149 .map_err(BoxedError::new)
150 .context(error::OpenRaftEngineBackendSnafu)?;
151
152 let kv_backend = Arc::new(kv_backend);
153 let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
154
155 let manager_config = ManagerConfig {
156 max_retry_times: procedure_config.max_retry_times,
157 retry_delay: procedure_config.retry_delay,
158 max_running_procedures: procedure_config.max_running_procedures,
159 ..Default::default()
160 };
161 let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
162 let procedure_manager = Arc::new(LocalManager::new(
163 manager_config,
164 kv_state_store.clone(),
165 kv_state_store,
166 Some(runtime_switch_manager),
167 None,
168 ));
169
170 Ok((kv_backend, procedure_manager))
171 }
172
173 pub fn catalog_manager(&self) -> &CatalogManagerRef {
174 &self.catalog_manager
175 }
176
177 pub fn query_engine(&self) -> &QueryEngineRef {
178 &self.query_engine
179 }
180
181 pub fn plugins(&self) -> &Plugins {
182 &self.plugins
183 }
184
185 pub fn statement_executor(&self) -> &StatementExecutorRef {
186 &self.statement_executor
187 }
188
189 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
190 &self.table_metadata_manager
191 }
192
193 pub fn inserter(&self) -> &InserterRef {
194 &self.inserter
195 }
196
197 pub fn process_manager(&self) -> &ProcessManagerRef {
198 &self.process_manager
199 }
200
201 pub fn node_manager(&self) -> &NodeManagerRef {
202 self.inserter.node_manager()
203 }
204
205 pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
206 self.inserter.partition_manager()
207 }
208
209 pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
210 self.statement_executor.cache_invalidator()
211 }
212
213 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
214 self.statement_executor.procedure_executor()
215 }
216}
217
218fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
219 ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
220}
221
222impl Instance {
223 async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
224 check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
225
226 let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
227 let query_interceptor = query_interceptor.as_ref();
228
229 if should_capture_statement(Some(&stmt)) {
230 let slow_query_timer = self
231 .slow_query_options
232 .enable
233 .then(|| self.event_recorder.clone())
234 .flatten()
235 .map(|event_recorder| {
236 SlowQueryTimer::new(
237 CatalogQueryStatement::Sql(stmt.clone()),
238 self.slow_query_options.threshold,
239 self.slow_query_options.sample_ratio,
240 self.slow_query_options.record_type,
241 event_recorder,
242 )
243 });
244
245 let ticket = self.process_manager.register_query(
246 query_ctx.current_catalog().to_string(),
247 vec![query_ctx.current_schema()],
248 stmt.to_string(),
249 query_ctx.conn_info().to_string(),
250 Some(query_ctx.process_id()),
251 slow_query_timer,
252 );
253
254 let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
255
256 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
257 .await
258 .map_err(|_| error::CancelledSnafu.build())?
259 .map(|output| {
260 let Output { meta, data } = output;
261
262 let data = match data {
263 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
264 CancellableStreamWrapper::new(stream, ticket),
265 )),
266 other => other,
267 };
268 Output { data, meta }
269 })
270 } else {
271 self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
272 .await
273 }
274 }
275
276 async fn exec_statement_with_timeout(
277 &self,
278 stmt: Statement,
279 query_ctx: QueryContextRef,
280 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
281 ) -> Result<Output> {
282 let timeout = derive_timeout(&stmt, &query_ctx);
283 match timeout {
284 Some(timeout) => {
285 let start = tokio::time::Instant::now();
286 let output = tokio::time::timeout(
287 timeout,
288 self.exec_statement(stmt, query_ctx, query_interceptor),
289 )
290 .await
291 .map_err(|_| StatementTimeoutSnafu.build())??;
292 let remaining_timeout = timeout.checked_sub(start.elapsed()).unwrap_or_default();
294 attach_timeout(output, remaining_timeout)
295 }
296 None => {
297 self.exec_statement(stmt, query_ctx, query_interceptor)
298 .await
299 }
300 }
301 }
302
303 async fn exec_statement(
304 &self,
305 stmt: Statement,
306 query_ctx: QueryContextRef,
307 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
308 ) -> Result<Output> {
309 match stmt {
310 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
311 if let Statement::Explain(explain) = &stmt
313 && let Some(format) = explain.format()
314 {
315 query_ctx.set_explain_format(format.to_string());
316 }
317
318 self.plan_and_exec_sql(stmt, &query_ctx, query_interceptor)
319 .await
320 }
321 Statement::Tql(tql) => {
322 self.plan_and_exec_tql(&query_ctx, query_interceptor, tql)
323 .await
324 }
325 _ => {
326 query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
327 self.statement_executor
328 .execute_sql(stmt, query_ctx)
329 .await
330 .context(TableOperationSnafu)
331 }
332 }
333 }
334
335 async fn plan_and_exec_sql(
336 &self,
337 stmt: Statement,
338 query_ctx: &QueryContextRef,
339 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
340 ) -> Result<Output> {
341 let stmt = QueryStatement::Sql(stmt);
342 let plan = self
343 .statement_executor
344 .plan(&stmt, query_ctx.clone())
345 .await?;
346 let QueryStatement::Sql(stmt) = stmt else {
347 unreachable!()
348 };
349 query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
350 self.statement_executor
351 .exec_plan(plan, query_ctx.clone())
352 .await
353 .context(TableOperationSnafu)
354 }
355
356 async fn plan_and_exec_tql(
357 &self,
358 query_ctx: &QueryContextRef,
359 query_interceptor: Option<&SqlQueryInterceptorRef<Error>>,
360 tql: Tql,
361 ) -> Result<Output> {
362 let plan = self
363 .statement_executor
364 .plan_tql(tql.clone(), query_ctx)
365 .await?;
366 query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
367 self.statement_executor
368 .exec_plan(plan, query_ctx.clone())
369 .await
370 .context(TableOperationSnafu)
371 }
372
373 async fn check_otlp_legacy(
374 &self,
375 names: &[&String],
376 ctx: QueryContextRef,
377 ) -> server_error::Result<bool> {
378 let db_string = ctx.get_db_string();
379 let cache = self
381 .otlp_metrics_table_legacy_cache
382 .entry(db_string.clone())
383 .or_default();
384 if let Some(flag) = fast_legacy_check(&cache, names)? {
385 return Ok(flag);
386 }
387 drop(cache);
389
390 let catalog = ctx.current_catalog();
391 let schema = ctx.current_schema();
392
393 let normalized_names = names
395 .iter()
396 .map(|n| legacy_normalize_otlp_name(n))
397 .collect::<Vec<_>>();
398 let table_names = normalized_names
399 .iter()
400 .map(|n| TableNameKey::new(catalog, &schema, n))
401 .collect::<Vec<_>>();
402 let table_values = self
403 .table_metadata_manager()
404 .table_name_manager()
405 .batch_get(table_names)
406 .await
407 .context(CommonMetaSnafu)?;
408 let table_ids = table_values
409 .into_iter()
410 .filter_map(|v| v.map(|vi| vi.table_id()))
411 .collect::<Vec<_>>();
412
413 if table_ids.is_empty() {
415 let cache = self
416 .otlp_metrics_table_legacy_cache
417 .entry(db_string)
418 .or_default();
419 names.iter().for_each(|name| {
420 cache.insert(name.to_string(), false);
421 });
422 return Ok(false);
423 }
424
425 let table_infos = self
427 .table_metadata_manager()
428 .table_info_manager()
429 .batch_get(&table_ids)
430 .await
431 .context(CommonMetaSnafu)?;
432 let options = table_infos
433 .values()
434 .map(|info| {
435 info.table_info
436 .meta
437 .options
438 .extra_options
439 .get(OTLP_METRIC_COMPAT_KEY)
440 .unwrap_or(&OTLP_LEGACY_DEFAULT_VALUE)
441 })
442 .collect::<Vec<_>>();
443 let cache = self
444 .otlp_metrics_table_legacy_cache
445 .entry(db_string)
446 .or_default();
447 if !options.is_empty() {
448 let has_prom = options.iter().any(|opt| *opt == OTLP_METRIC_COMPAT_PROM);
450 let has_legacy = options
451 .iter()
452 .any(|opt| *opt == OTLP_LEGACY_DEFAULT_VALUE.as_str());
453 ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
454 let flag = has_legacy;
455 names.iter().for_each(|name| {
456 cache.insert(name.to_string(), flag);
457 });
458 Ok(flag)
459 } else {
460 names.iter().for_each(|name| {
462 cache.insert(name.to_string(), false);
463 });
464 Ok(false)
465 }
466 }
467}
468
469fn fast_legacy_check(
470 cache: &DashMap<String, bool>,
471 names: &[&String],
472) -> server_error::Result<Option<bool>> {
473 let hit_cache = names
474 .iter()
475 .filter_map(|name| cache.get(*name))
476 .collect::<Vec<_>>();
477 if !hit_cache.is_empty() {
478 let hit_legacy = hit_cache.iter().any(|en| *en.value());
479 let hit_prom = hit_cache.iter().any(|en| !*en.value());
480
481 ensure!(!(hit_legacy && hit_prom), OtlpMetricModeIncompatibleSnafu);
485
486 let flag = hit_legacy;
487 drop(hit_cache);
489
490 names.iter().for_each(|name| {
492 if !cache.contains_key(*name) {
493 cache.insert(name.to_string(), flag);
494 }
495 });
496 Ok(Some(flag))
497 } else {
498 Ok(None)
499 }
500}
501
502fn derive_timeout(stmt: &Statement, query_ctx: &QueryContextRef) -> Option<Duration> {
505 let query_timeout = query_ctx.query_timeout()?;
506 if query_timeout.is_zero() {
507 return None;
508 }
509 match query_ctx.channel() {
510 Channel::Mysql if stmt.is_readonly() => Some(query_timeout),
511 Channel::Postgres => Some(query_timeout),
512 _ => None,
513 }
514}
515
516fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
517 if timeout.is_zero() {
518 return StatementTimeoutSnafu.fail();
519 }
520
521 let output = match output.data {
522 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
523 OutputData::Stream(mut stream) => {
524 let schema = stream.schema();
525 let s = Box::pin(stream! {
526 let mut start = tokio::time::Instant::now();
527 while let Some(item) = tokio::time::timeout(timeout, stream.next()).await.map_err(|_| StreamTimeoutSnafu.build())? {
528 yield item;
529
530 let now = tokio::time::Instant::now();
531 timeout = timeout.checked_sub(now - start).unwrap_or(Duration::ZERO);
532 start = now;
533 if timeout.is_zero() {
535 StreamTimeoutSnafu.fail()?;
536 }
537 }
538 }) as Pin<Box<dyn Stream<Item = _> + Send>>;
539 let stream = RecordBatchStreamWrapper {
540 schema,
541 stream: s,
542 output_ordering: None,
543 metrics: Default::default(),
544 };
545 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
546 }
547 };
548
549 Ok(output)
550}
551
552#[async_trait]
553impl SqlQueryHandler for Instance {
554 type Error = Error;
555
556 #[tracing::instrument(skip_all)]
557 async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
558 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
559 let query_interceptor = query_interceptor_opt.as_ref();
560 let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
561 Ok(q) => q,
562 Err(e) => return vec![Err(e)],
563 };
564
565 let checker_ref = self.plugins.get::<PermissionCheckerRef>();
566 let checker = checker_ref.as_ref();
567
568 match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
569 .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
570 {
571 Ok(stmts) => {
572 if stmts.is_empty() {
573 return vec![
574 InvalidSqlSnafu {
575 err_msg: "empty statements",
576 }
577 .fail(),
578 ];
579 }
580
581 let mut results = Vec::with_capacity(stmts.len());
582 for stmt in stmts {
583 if let Err(e) = checker
584 .check_permission(
585 query_ctx.current_user(),
586 PermissionReq::SqlStatement(&stmt),
587 )
588 .context(PermissionSnafu)
589 {
590 results.push(Err(e));
591 break;
592 }
593
594 match self.query_statement(stmt.clone(), query_ctx.clone()).await {
595 Ok(output) => {
596 let output_result =
597 query_interceptor.post_execute(output, query_ctx.clone());
598 results.push(output_result);
599 }
600 Err(e) => {
601 if e.status_code().should_log_error() {
602 error!(e; "Failed to execute query: {stmt}");
603 } else {
604 debug!("Failed to execute query: {stmt}, {e}");
605 }
606 results.push(Err(e));
607 break;
608 }
609 }
610 }
611 results
612 }
613 Err(e) => {
614 vec![Err(e)]
615 }
616 }
617 }
618
619 async fn do_exec_plan(
620 &self,
621 stmt: Option<Statement>,
622 plan: LogicalPlan,
623 query_ctx: QueryContextRef,
624 ) -> Result<Output> {
625 if should_capture_statement(stmt.as_ref()) {
626 let stmt = stmt.unwrap();
628 let query = stmt.to_string();
629 let slow_query_timer = self
630 .slow_query_options
631 .enable
632 .then(|| self.event_recorder.clone())
633 .flatten()
634 .map(|event_recorder| {
635 SlowQueryTimer::new(
636 CatalogQueryStatement::Sql(stmt.clone()),
637 self.slow_query_options.threshold,
638 self.slow_query_options.sample_ratio,
639 self.slow_query_options.record_type,
640 event_recorder,
641 )
642 });
643
644 let ticket = self.process_manager.register_query(
645 query_ctx.current_catalog().to_string(),
646 vec![query_ctx.current_schema()],
647 query,
648 query_ctx.conn_info().to_string(),
649 Some(query_ctx.process_id()),
650 slow_query_timer,
651 );
652
653 let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
654
655 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
656 .await
657 .map_err(|_| error::CancelledSnafu.build())?
658 .map(|output| {
659 let Output { meta, data } = output;
660
661 let data = match data {
662 OutputData::Stream(stream) => OutputData::Stream(Box::pin(
663 CancellableStreamWrapper::new(stream, ticket),
664 )),
665 other => other,
666 };
667 Output { data, meta }
668 })
669 .context(ExecLogicalPlanSnafu)
670 } else {
671 self.query_engine
674 .execute(plan.clone(), query_ctx)
675 .await
676 .context(ExecLogicalPlanSnafu)
677 }
678 }
679
680 #[tracing::instrument(skip_all)]
681 async fn do_promql_query(
682 &self,
683 query: &PromQuery,
684 query_ctx: QueryContextRef,
685 ) -> Vec<Result<Output>> {
686 let result = PrometheusHandler::do_query(self, query, query_ctx)
688 .await
689 .with_context(|_| ExecutePromqlSnafu {
690 query: format!("{query:?}"),
691 });
692 vec![result]
693 }
694
695 async fn do_describe(
696 &self,
697 stmt: Statement,
698 query_ctx: QueryContextRef,
699 ) -> Result<Option<DescribeResult>> {
700 if matches!(
701 stmt,
702 Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
703 ) {
704 self.plugins
705 .get::<PermissionCheckerRef>()
706 .as_ref()
707 .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
708 .context(PermissionSnafu)?;
709
710 let plan = self
711 .query_engine
712 .planner()
713 .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
714 .await
715 .context(PlanStatementSnafu)?;
716 self.query_engine
717 .describe(plan, query_ctx)
718 .await
719 .map(Some)
720 .context(error::DescribeStatementSnafu)
721 } else {
722 Ok(None)
723 }
724 }
725
726 async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
727 self.catalog_manager
728 .schema_exists(catalog, schema, None)
729 .await
730 .context(error::CatalogSnafu)
731 }
732}
733
734pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
736 match output.data {
737 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
738 OutputData::Stream(stream) => {
739 let stream = OnDone::new(stream, move || {
740 timer.observe_duration();
741 });
742 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
743 }
744 }
745}
746
747#[async_trait]
748impl PrometheusHandler for Instance {
749 #[tracing::instrument(skip_all)]
750 async fn do_query(
751 &self,
752 query: &PromQuery,
753 query_ctx: QueryContextRef,
754 ) -> server_error::Result<Output> {
755 let interceptor = self
756 .plugins
757 .get::<PromQueryInterceptorRef<server_error::Error>>();
758
759 self.plugins
760 .get::<PermissionCheckerRef>()
761 .as_ref()
762 .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
763 .context(AuthSnafu)?;
764
765 let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
766 ParsePromQLSnafu {
767 query: query.clone(),
768 }
769 })?;
770
771 let plan = self
772 .statement_executor
773 .plan(&stmt, query_ctx.clone())
774 .await
775 .map_err(BoxedError::new)
776 .context(ExecuteQuerySnafu)?;
777
778 interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
779
780 let query_statement = if let QueryStatement::Promql(eval_stmt) = stmt {
782 CatalogQueryStatement::Promql(eval_stmt)
783 } else {
784 return UnexpectedResultSnafu {
786 reason: "The query should always be promql.".to_string(),
787 }
788 .fail();
789 };
790 let query = query_statement.to_string();
791
792 let slow_query_timer = self
793 .slow_query_options
794 .enable
795 .then(|| self.event_recorder.clone())
796 .flatten()
797 .map(|event_recorder| {
798 SlowQueryTimer::new(
799 query_statement,
800 self.slow_query_options.threshold,
801 self.slow_query_options.sample_ratio,
802 self.slow_query_options.record_type,
803 event_recorder,
804 )
805 });
806
807 let ticket = self.process_manager.register_query(
808 query_ctx.current_catalog().to_string(),
809 vec![query_ctx.current_schema()],
810 query,
811 query_ctx.conn_info().to_string(),
812 Some(query_ctx.process_id()),
813 slow_query_timer,
814 );
815
816 let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
817
818 let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
819 .await
820 .map_err(|_| servers::error::CancelledSnafu.build())?
821 .map(|output| {
822 let Output { meta, data } = output;
823 let data = match data {
824 OutputData::Stream(stream) => {
825 OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
826 }
827 other => other,
828 };
829 Output { data, meta }
830 })
831 .map_err(BoxedError::new)
832 .context(ExecuteQuerySnafu)?;
833
834 Ok(interceptor.post_execute(output, query_ctx)?)
835 }
836
837 async fn query_metric_names(
838 &self,
839 matchers: Vec<Matcher>,
840 ctx: &QueryContextRef,
841 ) -> server_error::Result<Vec<String>> {
842 self.handle_query_metric_names(matchers, ctx)
843 .await
844 .map_err(BoxedError::new)
845 .context(ExecuteQuerySnafu)
846 }
847
848 async fn query_label_values(
849 &self,
850 metric: String,
851 label_name: String,
852 matchers: Vec<Matcher>,
853 start: SystemTime,
854 end: SystemTime,
855 ctx: &QueryContextRef,
856 ) -> server_error::Result<Vec<String>> {
857 self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
858 .await
859 .map_err(BoxedError::new)
860 .context(ExecuteQuerySnafu)
861 }
862
863 fn catalog_manager(&self) -> CatalogManagerRef {
864 self.catalog_manager.clone()
865 }
866}
867
868macro_rules! validate_db_permission {
870 ($stmt: expr, $query_ctx: expr) => {
871 if let Some(database) = &$stmt.database {
872 validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
873 .map_err(BoxedError::new)
874 .context(SqlExecInterceptedSnafu)?;
875 }
876 };
877}
878
879pub fn check_permission(
880 plugins: Plugins,
881 stmt: &Statement,
882 query_ctx: &QueryContextRef,
883) -> Result<()> {
884 let need_validate = plugins
885 .get::<QueryOptions>()
886 .map(|opts| opts.disallow_cross_catalog_query)
887 .unwrap_or_default();
888
889 if !need_validate {
890 return Ok(());
891 }
892
893 match stmt {
894 Statement::Admin(_) => {}
897 Statement::Query(_)
899 | Statement::Explain(_)
900 | Statement::Tql(_)
901 | Statement::Delete(_)
902 | Statement::DeclareCursor(_)
903 | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
904 Statement::CreateDatabase(_)
906 | Statement::ShowDatabases(_)
907 | Statement::DropDatabase(_)
908 | Statement::AlterDatabase(_)
909 | Statement::DropFlow(_)
910 | Statement::Use(_) => {}
911 #[cfg(feature = "enterprise")]
912 Statement::DropTrigger(_) => {}
913 Statement::ShowCreateDatabase(stmt) => {
914 validate_database(&stmt.database_name, query_ctx)?;
915 }
916 Statement::ShowCreateTable(stmt) => {
917 validate_param(&stmt.table_name, query_ctx)?;
918 }
919 Statement::ShowCreateFlow(stmt) => {
920 validate_param(&stmt.flow_name, query_ctx)?;
921 }
922 Statement::ShowCreateView(stmt) => {
923 validate_param(&stmt.view_name, query_ctx)?;
924 }
925 Statement::CreateExternalTable(stmt) => {
926 validate_param(&stmt.name, query_ctx)?;
927 }
928 Statement::CreateFlow(stmt) => {
929 validate_param(&stmt.sink_table_name, query_ctx)?;
931 }
932 #[cfg(feature = "enterprise")]
933 Statement::CreateTrigger(stmt) => {
934 validate_param(&stmt.trigger_name, query_ctx)?;
935 }
936 Statement::CreateView(stmt) => {
937 validate_param(&stmt.name, query_ctx)?;
938 }
939 Statement::AlterTable(stmt) => {
940 validate_param(stmt.table_name(), query_ctx)?;
941 }
942 #[cfg(feature = "enterprise")]
943 Statement::AlterTrigger(_) => {}
944 Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
946 Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
948
949 Statement::Insert(insert) => {
950 let name = insert.table_name().context(ParseSqlSnafu)?;
951 validate_param(name, query_ctx)?;
952 }
953 Statement::CreateTable(stmt) => {
954 validate_param(&stmt.name, query_ctx)?;
955 }
956 Statement::CreateTableLike(stmt) => {
957 validate_param(&stmt.table_name, query_ctx)?;
958 validate_param(&stmt.source_name, query_ctx)?;
959 }
960 Statement::DropTable(drop_stmt) => {
961 for table_name in drop_stmt.table_names() {
962 validate_param(table_name, query_ctx)?;
963 }
964 }
965 Statement::DropView(stmt) => {
966 validate_param(&stmt.view_name, query_ctx)?;
967 }
968 Statement::ShowTables(stmt) => {
969 validate_db_permission!(stmt, query_ctx);
970 }
971 Statement::ShowTableStatus(stmt) => {
972 validate_db_permission!(stmt, query_ctx);
973 }
974 Statement::ShowColumns(stmt) => {
975 validate_db_permission!(stmt, query_ctx);
976 }
977 Statement::ShowIndex(stmt) => {
978 validate_db_permission!(stmt, query_ctx);
979 }
980 Statement::ShowRegion(stmt) => {
981 validate_db_permission!(stmt, query_ctx);
982 }
983 Statement::ShowViews(stmt) => {
984 validate_db_permission!(stmt, query_ctx);
985 }
986 Statement::ShowFlows(stmt) => {
987 validate_db_permission!(stmt, query_ctx);
988 }
989 #[cfg(feature = "enterprise")]
990 Statement::ShowTriggers(_stmt) => {
991 }
994 Statement::ShowStatus(_stmt) => {}
995 Statement::ShowSearchPath(_stmt) => {}
996 Statement::DescribeTable(stmt) => {
997 validate_param(stmt.name(), query_ctx)?;
998 }
999 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
1000 CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?,
1001 CopyTable::From(copy_table_from) => {
1002 validate_param(©_table_from.table_name, query_ctx)?
1003 }
1004 },
1005 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
1006 match copy_database {
1007 CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1008 CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
1009 }
1010 }
1011 Statement::TruncateTable(stmt) => {
1012 validate_param(stmt.table_name(), query_ctx)?;
1013 }
1014 Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
1016 Statement::Kill(_) => {}
1018 Statement::ShowProcesslist(_) => {}
1020 }
1021 Ok(())
1022}
1023
1024fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1025 let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
1026 .map_err(BoxedError::new)
1027 .context(ExternalSnafu)?;
1028
1029 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1030 .map_err(BoxedError::new)
1031 .context(SqlExecInterceptedSnafu)
1032}
1033
1034fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
1035 let (catalog, schema) = match &name.0[..] {
1036 [schema] => (
1037 query_ctx.current_catalog().to_string(),
1038 schema.to_string_unquoted(),
1039 ),
1040 [catalog, schema] => (catalog.to_string_unquoted(), schema.to_string_unquoted()),
1041 _ => InvalidSqlSnafu {
1042 err_msg: format!(
1043 "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
1044 ),
1045 }
1046 .fail()?,
1047 };
1048
1049 validate_catalog_and_schema(&catalog, &schema, query_ctx)
1050 .map_err(BoxedError::new)
1051 .context(SqlExecInterceptedSnafu)
1052}
1053
1054fn should_capture_statement(stmt: Option<&Statement>) -> bool {
1056 if let Some(stmt) = stmt {
1057 matches!(stmt, Statement::Query(_)) || stmt.is_readonly()
1058 } else {
1059 false
1060 }
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065 use std::collections::HashMap;
1066 use std::sync::atomic::{AtomicBool, Ordering};
1067 use std::sync::{Arc, Barrier};
1068 use std::thread;
1069 use std::time::{Duration, Instant};
1070
1071 use common_base::Plugins;
1072 use query::query_engine::options::QueryOptions;
1073 use session::context::QueryContext;
1074 use sql::dialect::GreptimeDbDialect;
1075 use strfmt::Format;
1076
1077 use super::*;
1078
1079 #[test]
1080 fn test_fast_legacy_check_deadlock_prevention() {
1081 let cache = DashMap::new();
1083
1084 cache.insert("metric1".to_string(), true); cache.insert("metric2".to_string(), false); cache.insert("metric3".to_string(), true); let metric1 = "metric1".to_string();
1091 let metric4 = "metric4".to_string();
1092 let names1 = vec![&metric1, &metric4];
1093 let result = fast_legacy_check(&cache, &names1);
1094 assert!(result.is_ok());
1095 assert_eq!(result.unwrap(), Some(true)); assert!(cache.contains_key("metric4"));
1099 assert!(*cache.get("metric4").unwrap().value());
1100
1101 let metric5 = "metric5".to_string();
1103 let metric6 = "metric6".to_string();
1104 let names2 = vec![&metric5, &metric6];
1105 let result = fast_legacy_check(&cache, &names2);
1106 assert!(result.is_ok());
1107 assert_eq!(result.unwrap(), None); let cache_incompatible = DashMap::new();
1111 cache_incompatible.insert("metric1".to_string(), true); cache_incompatible.insert("metric2".to_string(), false); let metric1_test = "metric1".to_string();
1114 let metric2_test = "metric2".to_string();
1115 let names3 = vec![&metric1_test, &metric2_test];
1116 let result = fast_legacy_check(&cache_incompatible, &names3);
1117 assert!(result.is_err()); let cache_concurrent = Arc::new(DashMap::new());
1123 cache_concurrent.insert("shared_metric".to_string(), true);
1124
1125 let num_threads = 8;
1126 let operations_per_thread = 100;
1127 let barrier = Arc::new(Barrier::new(num_threads));
1128 let success_flag = Arc::new(AtomicBool::new(true));
1129
1130 let handles: Vec<_> = (0..num_threads)
1131 .map(|thread_id| {
1132 let cache_clone = Arc::clone(&cache_concurrent);
1133 let barrier_clone = Arc::clone(&barrier);
1134 let success_flag_clone = Arc::clone(&success_flag);
1135
1136 thread::spawn(move || {
1137 barrier_clone.wait();
1139
1140 let start_time = Instant::now();
1141 for i in 0..operations_per_thread {
1142 let shared_metric = "shared_metric".to_string();
1144 let new_metric = format!("thread_{}_metric_{}", thread_id, i);
1145 let names = vec![&shared_metric, &new_metric];
1146
1147 match fast_legacy_check(&cache_clone, &names) {
1148 Ok(_) => {}
1149 Err(_) => {
1150 success_flag_clone.store(false, Ordering::Relaxed);
1151 return;
1152 }
1153 }
1154
1155 if start_time.elapsed() > Duration::from_secs(10) {
1157 success_flag_clone.store(false, Ordering::Relaxed);
1158 return;
1159 }
1160 }
1161 })
1162 })
1163 .collect();
1164
1165 let start_time = Instant::now();
1167 for (i, handle) in handles.into_iter().enumerate() {
1168 let join_result = handle.join();
1169
1170 if start_time.elapsed() > Duration::from_secs(30) {
1172 panic!("Test timed out - possible deadlock detected!");
1173 }
1174
1175 if join_result.is_err() {
1176 panic!("Thread {} panicked during execution", i);
1177 }
1178 }
1179
1180 assert!(
1182 success_flag.load(Ordering::Relaxed),
1183 "Some operations failed"
1184 );
1185
1186 let final_count = cache_concurrent.len();
1188 assert!(
1189 final_count > 1 + num_threads * operations_per_thread / 2,
1190 "Expected more cache entries, got {}",
1191 final_count
1192 );
1193 }
1194
1195 #[test]
1196 fn test_exec_validation() {
1197 let query_ctx = QueryContext::arc();
1198 let plugins: Plugins = Plugins::new();
1199 plugins.insert(QueryOptions {
1200 disallow_cross_catalog_query: true,
1201 });
1202
1203 let sql = r#"
1204 SELECT * FROM demo;
1205 EXPLAIN SELECT * FROM demo;
1206 CREATE DATABASE test_database;
1207 SHOW DATABASES;
1208 "#;
1209 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1210 assert_eq!(stmts.len(), 4);
1211 for stmt in stmts {
1212 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1213 re.unwrap();
1214 }
1215
1216 let sql = r#"
1217 SHOW CREATE TABLE demo;
1218 ALTER TABLE demo ADD COLUMN new_col INT;
1219 "#;
1220 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1221 assert_eq!(stmts.len(), 2);
1222 for stmt in stmts {
1223 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
1224 re.unwrap();
1225 }
1226
1227 fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
1228 let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
1230 for (catalog, schema) in right {
1231 let sql = do_fmt(template_sql, catalog, schema);
1232 do_test(&sql, plugins.clone(), query_ctx, true);
1233 }
1234
1235 let wrong = vec![
1236 ("wrongcatalog.", "public."),
1237 ("wrongcatalog.", "wrongschema."),
1238 ];
1239 for (catalog, schema) in wrong {
1240 let sql = do_fmt(template_sql, catalog, schema);
1241 do_test(&sql, plugins.clone(), query_ctx, false);
1242 }
1243 }
1244
1245 fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
1246 let vars = HashMap::from([
1247 ("catalog".to_string(), catalog),
1248 ("schema".to_string(), schema),
1249 ]);
1250 template.format(&vars).unwrap()
1251 }
1252
1253 fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
1254 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
1255 let re = check_permission(plugins, stmt, query_ctx);
1256 if is_ok {
1257 re.unwrap();
1258 } else {
1259 assert!(re.is_err());
1260 }
1261 }
1262
1263 let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
1265 replace_test(sql, plugins.clone(), &query_ctx);
1266
1267 let sql = r#"CREATE TABLE {catalog}{schema}demo(
1269 host STRING,
1270 ts TIMESTAMP,
1271 TIME INDEX (ts),
1272 PRIMARY KEY(host)
1273 ) engine=mito;"#;
1274 replace_test(sql, plugins.clone(), &query_ctx);
1275
1276 let sql = "DROP TABLE {catalog}{schema}demo;";
1278 replace_test(sql, plugins.clone(), &query_ctx);
1279
1280 let sql = "SHOW TABLES FROM public";
1282 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1283 check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
1284
1285 let sql = "SHOW TABLES FROM private";
1286 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
1287 let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
1288 assert!(re.is_ok());
1289
1290 let sql = "DESC TABLE {catalog}{schema}demo;";
1292 replace_test(sql, plugins, &query_ctx);
1293 }
1294}