1pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::sync::Arc;
29use std::time::SystemTime;
30
31use async_trait::async_trait;
32use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
33use catalog::CatalogManagerRef;
34use client::OutputData;
35use common_base::Plugins;
36use common_config::KvBackendConfig;
37use common_error::ext::{BoxedError, ErrorExt};
38use common_meta::key::TableMetadataManagerRef;
39use common_meta::kv_backend::KvBackendRef;
40use common_meta::state_store::KvStateStore;
41use common_procedure::local::{LocalManager, ManagerConfig};
42use common_procedure::options::ProcedureConfig;
43use common_procedure::ProcedureManagerRef;
44use common_query::Output;
45use common_telemetry::{debug, error, info, tracing};
46use datafusion_expr::LogicalPlan;
47use log_store::raft_engine::RaftEngineBackend;
48use operator::delete::DeleterRef;
49use operator::insert::InserterRef;
50use operator::statement::{StatementExecutor, StatementExecutorRef};
51use pipeline::pipeline_operator::PipelineOperator;
52use prometheus::HistogramTimer;
53use promql_parser::label::Matcher;
54use query::metrics::OnDone;
55use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
56use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
57use query::query_engine::DescribeResult;
58use query::stats::StatementStatistics;
59use query::QueryEngineRef;
60use servers::error as server_error;
61use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
62use servers::interceptor::{
63 PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
64};
65use servers::prometheus_handler::PrometheusHandler;
66use servers::query_handler::sql::SqlQueryHandler;
67use session::context::QueryContextRef;
68use session::table_name::table_idents_to_full_name;
69use snafu::prelude::*;
70use sql::dialect::Dialect;
71use sql::parser::{ParseOptions, ParserContext};
72use sql::statements::copy::{CopyDatabase, CopyTable};
73use sql::statements::statement::Statement;
74use sqlparser::ast::ObjectName;
75pub use standalone::StandaloneDatanodeManager;
76
77use crate::error::{
78 self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
79 ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
80 TableOperationSnafu,
81};
82use crate::limiter::LimiterRef;
83
84#[derive(Clone)]
88pub struct Instance {
89 catalog_manager: CatalogManagerRef,
90 pipeline_operator: Arc<PipelineOperator>,
91 statement_executor: Arc<StatementExecutor>,
92 query_engine: QueryEngineRef,
93 plugins: Plugins,
94 inserter: InserterRef,
95 deleter: DeleterRef,
96 table_metadata_manager: TableMetadataManagerRef,
97 stats: StatementStatistics,
98 limiter: Option<LimiterRef>,
99}
100
101impl Instance {
102 pub async fn try_build_standalone_components(
103 dir: String,
104 kv_backend_config: KvBackendConfig,
105 procedure_config: ProcedureConfig,
106 ) -> Result<(KvBackendRef, ProcedureManagerRef)> {
107 info!(
108 "Creating metadata kvbackend with config: {:?}",
109 kv_backend_config
110 );
111 let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
112 .map_err(BoxedError::new)
113 .context(error::OpenRaftEngineBackendSnafu)?;
114
115 let kv_backend = Arc::new(kv_backend);
116 let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
117
118 let manager_config = ManagerConfig {
119 max_retry_times: procedure_config.max_retry_times,
120 retry_delay: procedure_config.retry_delay,
121 max_running_procedures: procedure_config.max_running_procedures,
122 ..Default::default()
123 };
124 let procedure_manager = Arc::new(LocalManager::new(
125 manager_config,
126 kv_state_store.clone(),
127 kv_state_store,
128 ));
129
130 Ok((kv_backend, procedure_manager))
131 }
132
133 pub fn catalog_manager(&self) -> &CatalogManagerRef {
134 &self.catalog_manager
135 }
136
137 pub fn query_engine(&self) -> &QueryEngineRef {
138 &self.query_engine
139 }
140
141 pub fn plugins(&self) -> &Plugins {
142 &self.plugins
143 }
144
145 pub fn statement_executor(&self) -> &StatementExecutorRef {
146 &self.statement_executor
147 }
148
149 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
150 &self.table_metadata_manager
151 }
152
153 pub fn inserter(&self) -> &InserterRef {
154 &self.inserter
155 }
156}
157
158fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
159 ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
160}
161
162impl Instance {
163 async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
164 check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
165
166 let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
167 let query_interceptor = query_interceptor.as_ref();
168
169 let _slow_query_timer = self
170 .stats
171 .start_slow_query_timer(QueryStatement::Sql(stmt.clone()));
172
173 let output = match stmt {
174 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
175 if let Statement::Explain(explain) = &stmt {
177 if let Some(format) = explain.format() {
178 query_ctx.set_explain_format(format.to_string());
179 }
180 }
181
182 let stmt = QueryStatement::Sql(stmt);
183 let plan = self
184 .statement_executor
185 .plan(&stmt, query_ctx.clone())
186 .await?;
187
188 let QueryStatement::Sql(stmt) = stmt else {
189 unreachable!()
190 };
191 query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
192
193 self.statement_executor.exec_plan(plan, query_ctx).await
194 }
195 Statement::Tql(tql) => {
196 let plan = self
197 .statement_executor
198 .plan_tql(tql.clone(), &query_ctx)
199 .await?;
200
201 query_interceptor.pre_execute(
202 &Statement::Tql(tql),
203 Some(&plan),
204 query_ctx.clone(),
205 )?;
206
207 self.statement_executor.exec_plan(plan, query_ctx).await
208 }
209 _ => {
210 query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
211
212 self.statement_executor.execute_sql(stmt, query_ctx).await
213 }
214 };
215 output.context(TableOperationSnafu)
216 }
217}
218
219#[async_trait]
220impl SqlQueryHandler for Instance {
221 type Error = Error;
222
223 #[tracing::instrument(skip_all)]
224 async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
225 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
226 let query_interceptor = query_interceptor_opt.as_ref();
227 let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
228 Ok(q) => q,
229 Err(e) => return vec![Err(e)],
230 };
231
232 let checker_ref = self.plugins.get::<PermissionCheckerRef>();
233 let checker = checker_ref.as_ref();
234
235 match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
236 .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
237 {
238 Ok(stmts) => {
239 let mut results = Vec::with_capacity(stmts.len());
240 for stmt in stmts {
241 if let Err(e) = checker
242 .check_permission(
243 query_ctx.current_user(),
244 PermissionReq::SqlStatement(&stmt),
245 )
246 .context(PermissionSnafu)
247 {
248 results.push(Err(e));
249 break;
250 }
251
252 match self.query_statement(stmt.clone(), query_ctx.clone()).await {
253 Ok(output) => {
254 let output_result =
255 query_interceptor.post_execute(output, query_ctx.clone());
256 results.push(output_result);
257 }
258 Err(e) => {
259 if e.status_code().should_log_error() {
260 error!(e; "Failed to execute query: {stmt}");
261 } else {
262 debug!("Failed to execute query: {stmt}, {e}");
263 }
264 results.push(Err(e));
265 break;
266 }
267 }
268 }
269 results
270 }
271 Err(e) => {
272 vec![Err(e)]
273 }
274 }
275 }
276
277 async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
278 self.query_engine
281 .execute(plan.clone(), query_ctx)
282 .await
283 .context(ExecLogicalPlanSnafu)
284 }
285
286 #[tracing::instrument(skip_all)]
287 async fn do_promql_query(
288 &self,
289 query: &PromQuery,
290 query_ctx: QueryContextRef,
291 ) -> Vec<Result<Output>> {
292 let result = PrometheusHandler::do_query(self, query, query_ctx)
294 .await
295 .with_context(|_| ExecutePromqlSnafu {
296 query: format!("{query:?}"),
297 });
298 vec![result]
299 }
300
301 async fn do_describe(
302 &self,
303 stmt: Statement,
304 query_ctx: QueryContextRef,
305 ) -> Result<Option<DescribeResult>> {
306 if matches!(
307 stmt,
308 Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
309 ) {
310 self.plugins
311 .get::<PermissionCheckerRef>()
312 .as_ref()
313 .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
314 .context(PermissionSnafu)?;
315
316 let plan = self
317 .query_engine
318 .planner()
319 .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
320 .await
321 .context(PlanStatementSnafu)?;
322 self.query_engine
323 .describe(plan, query_ctx)
324 .await
325 .map(Some)
326 .context(error::DescribeStatementSnafu)
327 } else {
328 Ok(None)
329 }
330 }
331
332 async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
333 self.catalog_manager
334 .schema_exists(catalog, schema, None)
335 .await
336 .context(error::CatalogSnafu)
337 }
338}
339
340pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
342 match output.data {
343 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
344 OutputData::Stream(stream) => {
345 let stream = OnDone::new(stream, move || {
346 timer.observe_duration();
347 });
348 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
349 }
350 }
351}
352
353#[async_trait]
354impl PrometheusHandler for Instance {
355 #[tracing::instrument(skip_all)]
356 async fn do_query(
357 &self,
358 query: &PromQuery,
359 query_ctx: QueryContextRef,
360 ) -> server_error::Result<Output> {
361 let interceptor = self
362 .plugins
363 .get::<PromQueryInterceptorRef<server_error::Error>>();
364
365 self.plugins
366 .get::<PermissionCheckerRef>()
367 .as_ref()
368 .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
369 .context(AuthSnafu)?;
370
371 let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
372 ParsePromQLSnafu {
373 query: query.clone(),
374 }
375 })?;
376
377 let _slow_query_timer = self.stats.start_slow_query_timer(stmt.clone());
378
379 let plan = self
380 .statement_executor
381 .plan(&stmt, query_ctx.clone())
382 .await
383 .map_err(BoxedError::new)
384 .context(ExecuteQuerySnafu)?;
385
386 interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
387
388 let output = self
389 .statement_executor
390 .exec_plan(plan, query_ctx.clone())
391 .await
392 .map_err(BoxedError::new)
393 .context(ExecuteQuerySnafu)?;
394
395 Ok(interceptor.post_execute(output, query_ctx)?)
396 }
397
398 async fn query_metric_names(
399 &self,
400 matchers: Vec<Matcher>,
401 ctx: &QueryContextRef,
402 ) -> server_error::Result<Vec<String>> {
403 self.handle_query_metric_names(matchers, ctx)
404 .await
405 .map_err(BoxedError::new)
406 .context(ExecuteQuerySnafu)
407 }
408
409 async fn query_label_values(
410 &self,
411 metric: String,
412 label_name: String,
413 matchers: Vec<Matcher>,
414 start: SystemTime,
415 end: SystemTime,
416 ctx: &QueryContextRef,
417 ) -> server_error::Result<Vec<String>> {
418 self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
419 .await
420 .map_err(BoxedError::new)
421 .context(ExecuteQuerySnafu)
422 }
423
424 fn catalog_manager(&self) -> CatalogManagerRef {
425 self.catalog_manager.clone()
426 }
427}
428
429macro_rules! validate_db_permission {
431 ($stmt: expr, $query_ctx: expr) => {
432 if let Some(database) = &$stmt.database {
433 validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
434 .map_err(BoxedError::new)
435 .context(SqlExecInterceptedSnafu)?;
436 }
437 };
438}
439
440pub fn check_permission(
441 plugins: Plugins,
442 stmt: &Statement,
443 query_ctx: &QueryContextRef,
444) -> Result<()> {
445 let need_validate = plugins
446 .get::<QueryOptions>()
447 .map(|opts| opts.disallow_cross_catalog_query)
448 .unwrap_or_default();
449
450 if !need_validate {
451 return Ok(());
452 }
453
454 match stmt {
455 Statement::Admin(_) => {}
458 Statement::Query(_)
460 | Statement::Explain(_)
461 | Statement::Tql(_)
462 | Statement::Delete(_)
463 | Statement::DeclareCursor(_)
464 | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
465 Statement::CreateDatabase(_)
467 | Statement::ShowDatabases(_)
468 | Statement::DropDatabase(_)
469 | Statement::AlterDatabase(_)
470 | Statement::DropFlow(_)
471 | Statement::Use(_) => {}
472 Statement::ShowCreateDatabase(stmt) => {
473 validate_database(&stmt.database_name, query_ctx)?;
474 }
475 Statement::ShowCreateTable(stmt) => {
476 validate_param(&stmt.table_name, query_ctx)?;
477 }
478 Statement::ShowCreateFlow(stmt) => {
479 validate_param(&stmt.flow_name, query_ctx)?;
480 }
481 Statement::ShowCreateView(stmt) => {
482 validate_param(&stmt.view_name, query_ctx)?;
483 }
484 Statement::CreateExternalTable(stmt) => {
485 validate_param(&stmt.name, query_ctx)?;
486 }
487 Statement::CreateFlow(stmt) => {
488 validate_param(&stmt.sink_table_name, query_ctx)?;
490 }
491 Statement::CreateView(stmt) => {
492 validate_param(&stmt.name, query_ctx)?;
493 }
494 Statement::AlterTable(stmt) => {
495 validate_param(stmt.table_name(), query_ctx)?;
496 }
497 Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
499 Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
501
502 Statement::Insert(insert) => {
503 let name = insert.table_name().context(ParseSqlSnafu)?;
504 validate_param(name, query_ctx)?;
505 }
506 Statement::CreateTable(stmt) => {
507 validate_param(&stmt.name, query_ctx)?;
508 }
509 Statement::CreateTableLike(stmt) => {
510 validate_param(&stmt.table_name, query_ctx)?;
511 validate_param(&stmt.source_name, query_ctx)?;
512 }
513 Statement::DropTable(drop_stmt) => {
514 for table_name in drop_stmt.table_names() {
515 validate_param(table_name, query_ctx)?;
516 }
517 }
518 Statement::DropView(stmt) => {
519 validate_param(&stmt.view_name, query_ctx)?;
520 }
521 Statement::ShowTables(stmt) => {
522 validate_db_permission!(stmt, query_ctx);
523 }
524 Statement::ShowTableStatus(stmt) => {
525 validate_db_permission!(stmt, query_ctx);
526 }
527 Statement::ShowColumns(stmt) => {
528 validate_db_permission!(stmt, query_ctx);
529 }
530 Statement::ShowIndex(stmt) => {
531 validate_db_permission!(stmt, query_ctx);
532 }
533 Statement::ShowRegion(stmt) => {
534 validate_db_permission!(stmt, query_ctx);
535 }
536 Statement::ShowViews(stmt) => {
537 validate_db_permission!(stmt, query_ctx);
538 }
539 Statement::ShowFlows(stmt) => {
540 validate_db_permission!(stmt, query_ctx);
541 }
542 Statement::ShowStatus(_stmt) => {}
543 Statement::ShowSearchPath(_stmt) => {}
544 Statement::DescribeTable(stmt) => {
545 validate_param(stmt.name(), query_ctx)?;
546 }
547 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
548 CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?,
549 CopyTable::From(copy_table_from) => {
550 validate_param(©_table_from.table_name, query_ctx)?
551 }
552 },
553 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
554 match copy_database {
555 CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
556 CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
557 }
558 }
559 Statement::TruncateTable(stmt) => {
560 validate_param(stmt.table_name(), query_ctx)?;
561 }
562 Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
564 }
565 Ok(())
566}
567
568fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
569 let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
570 .map_err(BoxedError::new)
571 .context(ExternalSnafu)?;
572
573 validate_catalog_and_schema(&catalog, &schema, query_ctx)
574 .map_err(BoxedError::new)
575 .context(SqlExecInterceptedSnafu)
576}
577
578fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
579 let (catalog, schema) = match &name.0[..] {
580 [schema] => (
581 query_ctx.current_catalog().to_string(),
582 schema.value.clone(),
583 ),
584 [catalog, schema] => (catalog.value.clone(), schema.value.clone()),
585 _ => InvalidSqlSnafu {
586 err_msg: format!(
587 "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
588 ),
589 }
590 .fail()?,
591 };
592
593 validate_catalog_and_schema(&catalog, &schema, query_ctx)
594 .map_err(BoxedError::new)
595 .context(SqlExecInterceptedSnafu)
596}
597
598#[cfg(test)]
599mod tests {
600 use std::collections::HashMap;
601
602 use common_base::Plugins;
603 use query::query_engine::options::QueryOptions;
604 use session::context::QueryContext;
605 use sql::dialect::GreptimeDbDialect;
606 use strfmt::Format;
607
608 use super::*;
609
610 #[test]
611 fn test_exec_validation() {
612 let query_ctx = QueryContext::arc();
613 let plugins: Plugins = Plugins::new();
614 plugins.insert(QueryOptions {
615 disallow_cross_catalog_query: true,
616 });
617
618 let sql = r#"
619 SELECT * FROM demo;
620 EXPLAIN SELECT * FROM demo;
621 CREATE DATABASE test_database;
622 SHOW DATABASES;
623 "#;
624 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
625 assert_eq!(stmts.len(), 4);
626 for stmt in stmts {
627 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
628 re.unwrap();
629 }
630
631 let sql = r#"
632 SHOW CREATE TABLE demo;
633 ALTER TABLE demo ADD COLUMN new_col INT;
634 "#;
635 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
636 assert_eq!(stmts.len(), 2);
637 for stmt in stmts {
638 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
639 re.unwrap();
640 }
641
642 fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
643 let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
645 for (catalog, schema) in right {
646 let sql = do_fmt(template_sql, catalog, schema);
647 do_test(&sql, plugins.clone(), query_ctx, true);
648 }
649
650 let wrong = vec![
651 ("wrongcatalog.", "public."),
652 ("wrongcatalog.", "wrongschema."),
653 ];
654 for (catalog, schema) in wrong {
655 let sql = do_fmt(template_sql, catalog, schema);
656 do_test(&sql, plugins.clone(), query_ctx, false);
657 }
658 }
659
660 fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
661 let vars = HashMap::from([
662 ("catalog".to_string(), catalog),
663 ("schema".to_string(), schema),
664 ]);
665 template.format(&vars).unwrap()
666 }
667
668 fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
669 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
670 let re = check_permission(plugins, stmt, query_ctx);
671 if is_ok {
672 re.unwrap();
673 } else {
674 assert!(re.is_err());
675 }
676 }
677
678 let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
680 replace_test(sql, plugins.clone(), &query_ctx);
681
682 let sql = r#"CREATE TABLE {catalog}{schema}demo(
684 host STRING,
685 ts TIMESTAMP,
686 TIME INDEX (ts),
687 PRIMARY KEY(host)
688 ) engine=mito;"#;
689 replace_test(sql, plugins.clone(), &query_ctx);
690
691 let sql = "DROP TABLE {catalog}{schema}demo;";
693 replace_test(sql, plugins.clone(), &query_ctx);
694
695 let sql = "SHOW TABLES FROM public";
697 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
698 check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
699
700 let sql = "SHOW TABLES FROM private";
701 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
702 let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
703 assert!(re.is_ok());
704
705 let sql = "DESC TABLE {catalog}{schema}demo;";
707 replace_test(sql, plugins, &query_ctx);
708 }
709}