1pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::sync::Arc;
29use std::time::SystemTime;
30
31use async_trait::async_trait;
32use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
33use catalog::CatalogManagerRef;
34use client::OutputData;
35use common_base::Plugins;
36use common_config::KvBackendConfig;
37use common_error::ext::{BoxedError, ErrorExt};
38use common_meta::key::TableMetadataManagerRef;
39use common_meta::kv_backend::KvBackendRef;
40use common_meta::state_store::KvStateStore;
41use common_procedure::local::{LocalManager, ManagerConfig};
42use common_procedure::options::ProcedureConfig;
43use common_procedure::ProcedureManagerRef;
44use common_query::Output;
45use common_telemetry::{debug, error, info, tracing};
46use datafusion_expr::LogicalPlan;
47use log_store::raft_engine::RaftEngineBackend;
48use operator::delete::DeleterRef;
49use operator::insert::InserterRef;
50use operator::statement::{StatementExecutor, StatementExecutorRef};
51use pipeline::pipeline_operator::PipelineOperator;
52use prometheus::HistogramTimer;
53use promql_parser::label::Matcher;
54use query::metrics::OnDone;
55use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
56use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
57use query::query_engine::DescribeResult;
58use query::QueryEngineRef;
59use servers::error as server_error;
60use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
61use servers::interceptor::{
62 PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
63};
64use servers::prometheus_handler::PrometheusHandler;
65use servers::query_handler::sql::SqlQueryHandler;
66use session::context::QueryContextRef;
67use session::table_name::table_idents_to_full_name;
68use snafu::prelude::*;
69use sql::dialect::Dialect;
70use sql::parser::{ParseOptions, ParserContext};
71use sql::statements::copy::{CopyDatabase, CopyTable};
72use sql::statements::statement::Statement;
73use sqlparser::ast::ObjectName;
74pub use standalone::StandaloneDatanodeManager;
75
76use crate::error::{
77 self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
78 ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
79 TableOperationSnafu,
80};
81use crate::limiter::LimiterRef;
82use crate::slow_query_recorder::SlowQueryRecorder;
83
84#[derive(Clone)]
88pub struct Instance {
89 catalog_manager: CatalogManagerRef,
90 pipeline_operator: Arc<PipelineOperator>,
91 statement_executor: Arc<StatementExecutor>,
92 query_engine: QueryEngineRef,
93 plugins: Plugins,
94 inserter: InserterRef,
95 deleter: DeleterRef,
96 table_metadata_manager: TableMetadataManagerRef,
97 slow_query_recorder: Option<SlowQueryRecorder>,
98 limiter: Option<LimiterRef>,
99}
100
101impl Instance {
102 pub async fn try_build_standalone_components(
103 dir: String,
104 kv_backend_config: KvBackendConfig,
105 procedure_config: ProcedureConfig,
106 ) -> Result<(KvBackendRef, ProcedureManagerRef)> {
107 info!(
108 "Creating metadata kvbackend with config: {:?}",
109 kv_backend_config
110 );
111 let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
112 .map_err(BoxedError::new)
113 .context(error::OpenRaftEngineBackendSnafu)?;
114
115 let kv_backend = Arc::new(kv_backend);
116 let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
117
118 let manager_config = ManagerConfig {
119 max_retry_times: procedure_config.max_retry_times,
120 retry_delay: procedure_config.retry_delay,
121 max_running_procedures: procedure_config.max_running_procedures,
122 ..Default::default()
123 };
124 let procedure_manager = Arc::new(LocalManager::new(
125 manager_config,
126 kv_state_store.clone(),
127 kv_state_store,
128 ));
129
130 Ok((kv_backend, procedure_manager))
131 }
132
133 pub fn catalog_manager(&self) -> &CatalogManagerRef {
134 &self.catalog_manager
135 }
136
137 pub fn query_engine(&self) -> &QueryEngineRef {
138 &self.query_engine
139 }
140
141 pub fn plugins(&self) -> &Plugins {
142 &self.plugins
143 }
144
145 pub fn statement_executor(&self) -> &StatementExecutorRef {
146 &self.statement_executor
147 }
148
149 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
150 &self.table_metadata_manager
151 }
152
153 pub fn inserter(&self) -> &InserterRef {
154 &self.inserter
155 }
156}
157
158fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
159 ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
160}
161
162impl Instance {
163 async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
164 check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
165
166 let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
167 let query_interceptor = query_interceptor.as_ref();
168
169 let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
170 recorder.start(QueryStatement::Sql(stmt.clone()), query_ctx.clone())
171 } else {
172 None
173 };
174
175 let output = match stmt {
176 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
177 if let Statement::Explain(explain) = &stmt {
179 if let Some(format) = explain.format() {
180 query_ctx.set_explain_format(format.to_string());
181 }
182 }
183
184 let stmt = QueryStatement::Sql(stmt);
185 let plan = self
186 .statement_executor
187 .plan(&stmt, query_ctx.clone())
188 .await?;
189
190 let QueryStatement::Sql(stmt) = stmt else {
191 unreachable!()
192 };
193 query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
194
195 self.statement_executor.exec_plan(plan, query_ctx).await
196 }
197 Statement::Tql(tql) => {
198 let plan = self
199 .statement_executor
200 .plan_tql(tql.clone(), &query_ctx)
201 .await?;
202
203 query_interceptor.pre_execute(
204 &Statement::Tql(tql),
205 Some(&plan),
206 query_ctx.clone(),
207 )?;
208
209 self.statement_executor.exec_plan(plan, query_ctx).await
210 }
211 _ => {
212 query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
213
214 self.statement_executor.execute_sql(stmt, query_ctx).await
215 }
216 };
217
218 output.context(TableOperationSnafu)
219 }
220}
221
222#[async_trait]
223impl SqlQueryHandler for Instance {
224 type Error = Error;
225
226 #[tracing::instrument(skip_all)]
227 async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
228 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
229 let query_interceptor = query_interceptor_opt.as_ref();
230 let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
231 Ok(q) => q,
232 Err(e) => return vec![Err(e)],
233 };
234
235 let checker_ref = self.plugins.get::<PermissionCheckerRef>();
236 let checker = checker_ref.as_ref();
237
238 match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
239 .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
240 {
241 Ok(stmts) => {
242 let mut results = Vec::with_capacity(stmts.len());
243 for stmt in stmts {
244 if let Err(e) = checker
245 .check_permission(
246 query_ctx.current_user(),
247 PermissionReq::SqlStatement(&stmt),
248 )
249 .context(PermissionSnafu)
250 {
251 results.push(Err(e));
252 break;
253 }
254
255 match self.query_statement(stmt.clone(), query_ctx.clone()).await {
256 Ok(output) => {
257 let output_result =
258 query_interceptor.post_execute(output, query_ctx.clone());
259 results.push(output_result);
260 }
261 Err(e) => {
262 if e.status_code().should_log_error() {
263 error!(e; "Failed to execute query: {stmt}");
264 } else {
265 debug!("Failed to execute query: {stmt}, {e}");
266 }
267 results.push(Err(e));
268 break;
269 }
270 }
271 }
272 results
273 }
274 Err(e) => {
275 vec![Err(e)]
276 }
277 }
278 }
279
280 async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
281 self.query_engine
284 .execute(plan.clone(), query_ctx)
285 .await
286 .context(ExecLogicalPlanSnafu)
287 }
288
289 #[tracing::instrument(skip_all)]
290 async fn do_promql_query(
291 &self,
292 query: &PromQuery,
293 query_ctx: QueryContextRef,
294 ) -> Vec<Result<Output>> {
295 let result = PrometheusHandler::do_query(self, query, query_ctx)
297 .await
298 .with_context(|_| ExecutePromqlSnafu {
299 query: format!("{query:?}"),
300 });
301 vec![result]
302 }
303
304 async fn do_describe(
305 &self,
306 stmt: Statement,
307 query_ctx: QueryContextRef,
308 ) -> Result<Option<DescribeResult>> {
309 if matches!(
310 stmt,
311 Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
312 ) {
313 self.plugins
314 .get::<PermissionCheckerRef>()
315 .as_ref()
316 .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
317 .context(PermissionSnafu)?;
318
319 let plan = self
320 .query_engine
321 .planner()
322 .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
323 .await
324 .context(PlanStatementSnafu)?;
325 self.query_engine
326 .describe(plan, query_ctx)
327 .await
328 .map(Some)
329 .context(error::DescribeStatementSnafu)
330 } else {
331 Ok(None)
332 }
333 }
334
335 async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
336 self.catalog_manager
337 .schema_exists(catalog, schema, None)
338 .await
339 .context(error::CatalogSnafu)
340 }
341}
342
343pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
345 match output.data {
346 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
347 OutputData::Stream(stream) => {
348 let stream = OnDone::new(stream, move || {
349 timer.observe_duration();
350 });
351 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
352 }
353 }
354}
355
356#[async_trait]
357impl PrometheusHandler for Instance {
358 #[tracing::instrument(skip_all)]
359 async fn do_query(
360 &self,
361 query: &PromQuery,
362 query_ctx: QueryContextRef,
363 ) -> server_error::Result<Output> {
364 let interceptor = self
365 .plugins
366 .get::<PromQueryInterceptorRef<server_error::Error>>();
367
368 self.plugins
369 .get::<PermissionCheckerRef>()
370 .as_ref()
371 .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
372 .context(AuthSnafu)?;
373
374 let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
375 ParsePromQLSnafu {
376 query: query.clone(),
377 }
378 })?;
379
380 let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
381 recorder.start(stmt.clone(), query_ctx.clone())
382 } else {
383 None
384 };
385
386 let plan = self
387 .statement_executor
388 .plan(&stmt, query_ctx.clone())
389 .await
390 .map_err(BoxedError::new)
391 .context(ExecuteQuerySnafu)?;
392
393 interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
394
395 let output = self
396 .statement_executor
397 .exec_plan(plan, query_ctx.clone())
398 .await
399 .map_err(BoxedError::new)
400 .context(ExecuteQuerySnafu)?;
401
402 Ok(interceptor.post_execute(output, query_ctx)?)
403 }
404
405 async fn query_metric_names(
406 &self,
407 matchers: Vec<Matcher>,
408 ctx: &QueryContextRef,
409 ) -> server_error::Result<Vec<String>> {
410 self.handle_query_metric_names(matchers, ctx)
411 .await
412 .map_err(BoxedError::new)
413 .context(ExecuteQuerySnafu)
414 }
415
416 async fn query_label_values(
417 &self,
418 metric: String,
419 label_name: String,
420 matchers: Vec<Matcher>,
421 start: SystemTime,
422 end: SystemTime,
423 ctx: &QueryContextRef,
424 ) -> server_error::Result<Vec<String>> {
425 self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
426 .await
427 .map_err(BoxedError::new)
428 .context(ExecuteQuerySnafu)
429 }
430
431 fn catalog_manager(&self) -> CatalogManagerRef {
432 self.catalog_manager.clone()
433 }
434}
435
436macro_rules! validate_db_permission {
438 ($stmt: expr, $query_ctx: expr) => {
439 if let Some(database) = &$stmt.database {
440 validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
441 .map_err(BoxedError::new)
442 .context(SqlExecInterceptedSnafu)?;
443 }
444 };
445}
446
447pub fn check_permission(
448 plugins: Plugins,
449 stmt: &Statement,
450 query_ctx: &QueryContextRef,
451) -> Result<()> {
452 let need_validate = plugins
453 .get::<QueryOptions>()
454 .map(|opts| opts.disallow_cross_catalog_query)
455 .unwrap_or_default();
456
457 if !need_validate {
458 return Ok(());
459 }
460
461 match stmt {
462 Statement::Admin(_) => {}
465 Statement::Query(_)
467 | Statement::Explain(_)
468 | Statement::Tql(_)
469 | Statement::Delete(_)
470 | Statement::DeclareCursor(_)
471 | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
472 Statement::CreateDatabase(_)
474 | Statement::ShowDatabases(_)
475 | Statement::DropDatabase(_)
476 | Statement::AlterDatabase(_)
477 | Statement::DropFlow(_)
478 | Statement::Use(_) => {}
479 Statement::ShowCreateDatabase(stmt) => {
480 validate_database(&stmt.database_name, query_ctx)?;
481 }
482 Statement::ShowCreateTable(stmt) => {
483 validate_param(&stmt.table_name, query_ctx)?;
484 }
485 Statement::ShowCreateFlow(stmt) => {
486 validate_param(&stmt.flow_name, query_ctx)?;
487 }
488 Statement::ShowCreateView(stmt) => {
489 validate_param(&stmt.view_name, query_ctx)?;
490 }
491 Statement::CreateExternalTable(stmt) => {
492 validate_param(&stmt.name, query_ctx)?;
493 }
494 Statement::CreateFlow(stmt) => {
495 validate_param(&stmt.sink_table_name, query_ctx)?;
497 }
498 Statement::CreateView(stmt) => {
499 validate_param(&stmt.name, query_ctx)?;
500 }
501 Statement::AlterTable(stmt) => {
502 validate_param(stmt.table_name(), query_ctx)?;
503 }
504 Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
506 Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
508
509 Statement::Insert(insert) => {
510 let name = insert.table_name().context(ParseSqlSnafu)?;
511 validate_param(name, query_ctx)?;
512 }
513 Statement::CreateTable(stmt) => {
514 validate_param(&stmt.name, query_ctx)?;
515 }
516 Statement::CreateTableLike(stmt) => {
517 validate_param(&stmt.table_name, query_ctx)?;
518 validate_param(&stmt.source_name, query_ctx)?;
519 }
520 Statement::DropTable(drop_stmt) => {
521 for table_name in drop_stmt.table_names() {
522 validate_param(table_name, query_ctx)?;
523 }
524 }
525 Statement::DropView(stmt) => {
526 validate_param(&stmt.view_name, query_ctx)?;
527 }
528 Statement::ShowTables(stmt) => {
529 validate_db_permission!(stmt, query_ctx);
530 }
531 Statement::ShowTableStatus(stmt) => {
532 validate_db_permission!(stmt, query_ctx);
533 }
534 Statement::ShowColumns(stmt) => {
535 validate_db_permission!(stmt, query_ctx);
536 }
537 Statement::ShowIndex(stmt) => {
538 validate_db_permission!(stmt, query_ctx);
539 }
540 Statement::ShowRegion(stmt) => {
541 validate_db_permission!(stmt, query_ctx);
542 }
543 Statement::ShowViews(stmt) => {
544 validate_db_permission!(stmt, query_ctx);
545 }
546 Statement::ShowFlows(stmt) => {
547 validate_db_permission!(stmt, query_ctx);
548 }
549 Statement::ShowStatus(_stmt) => {}
550 Statement::ShowSearchPath(_stmt) => {}
551 Statement::DescribeTable(stmt) => {
552 validate_param(stmt.name(), query_ctx)?;
553 }
554 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
555 CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?,
556 CopyTable::From(copy_table_from) => {
557 validate_param(©_table_from.table_name, query_ctx)?
558 }
559 },
560 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
561 match copy_database {
562 CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
563 CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
564 }
565 }
566 Statement::TruncateTable(stmt) => {
567 validate_param(stmt.table_name(), query_ctx)?;
568 }
569 Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
571 }
572 Ok(())
573}
574
575fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
576 let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
577 .map_err(BoxedError::new)
578 .context(ExternalSnafu)?;
579
580 validate_catalog_and_schema(&catalog, &schema, query_ctx)
581 .map_err(BoxedError::new)
582 .context(SqlExecInterceptedSnafu)
583}
584
585fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
586 let (catalog, schema) = match &name.0[..] {
587 [schema] => (
588 query_ctx.current_catalog().to_string(),
589 schema.value.clone(),
590 ),
591 [catalog, schema] => (catalog.value.clone(), schema.value.clone()),
592 _ => InvalidSqlSnafu {
593 err_msg: format!(
594 "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
595 ),
596 }
597 .fail()?,
598 };
599
600 validate_catalog_and_schema(&catalog, &schema, query_ctx)
601 .map_err(BoxedError::new)
602 .context(SqlExecInterceptedSnafu)
603}
604
605#[cfg(test)]
606mod tests {
607 use std::collections::HashMap;
608
609 use common_base::Plugins;
610 use query::query_engine::options::QueryOptions;
611 use session::context::QueryContext;
612 use sql::dialect::GreptimeDbDialect;
613 use strfmt::Format;
614
615 use super::*;
616
617 #[test]
618 fn test_exec_validation() {
619 let query_ctx = QueryContext::arc();
620 let plugins: Plugins = Plugins::new();
621 plugins.insert(QueryOptions {
622 disallow_cross_catalog_query: true,
623 });
624
625 let sql = r#"
626 SELECT * FROM demo;
627 EXPLAIN SELECT * FROM demo;
628 CREATE DATABASE test_database;
629 SHOW DATABASES;
630 "#;
631 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
632 assert_eq!(stmts.len(), 4);
633 for stmt in stmts {
634 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
635 re.unwrap();
636 }
637
638 let sql = r#"
639 SHOW CREATE TABLE demo;
640 ALTER TABLE demo ADD COLUMN new_col INT;
641 "#;
642 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
643 assert_eq!(stmts.len(), 2);
644 for stmt in stmts {
645 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
646 re.unwrap();
647 }
648
649 fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
650 let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
652 for (catalog, schema) in right {
653 let sql = do_fmt(template_sql, catalog, schema);
654 do_test(&sql, plugins.clone(), query_ctx, true);
655 }
656
657 let wrong = vec![
658 ("wrongcatalog.", "public."),
659 ("wrongcatalog.", "wrongschema."),
660 ];
661 for (catalog, schema) in wrong {
662 let sql = do_fmt(template_sql, catalog, schema);
663 do_test(&sql, plugins.clone(), query_ctx, false);
664 }
665 }
666
667 fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
668 let vars = HashMap::from([
669 ("catalog".to_string(), catalog),
670 ("schema".to_string(), schema),
671 ]);
672 template.format(&vars).unwrap()
673 }
674
675 fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
676 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
677 let re = check_permission(plugins, stmt, query_ctx);
678 if is_ok {
679 re.unwrap();
680 } else {
681 assert!(re.is_err());
682 }
683 }
684
685 let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
687 replace_test(sql, plugins.clone(), &query_ctx);
688
689 let sql = r#"CREATE TABLE {catalog}{schema}demo(
691 host STRING,
692 ts TIMESTAMP,
693 TIME INDEX (ts),
694 PRIMARY KEY(host)
695 ) engine=mito;"#;
696 replace_test(sql, plugins.clone(), &query_ctx);
697
698 let sql = "DROP TABLE {catalog}{schema}demo;";
700 replace_test(sql, plugins.clone(), &query_ctx);
701
702 let sql = "SHOW TABLES FROM public";
704 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
705 check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
706
707 let sql = "SHOW TABLES FROM private";
708 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
709 let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
710 assert!(re.is_ok());
711
712 let sql = "DESC TABLE {catalog}{schema}demo;";
714 replace_test(sql, plugins, &query_ctx);
715 }
716}