1pub mod builder;
16mod grpc;
17mod influxdb;
18mod jaeger;
19mod log_handler;
20mod logs;
21mod opentsdb;
22mod otlp;
23pub mod prom_store;
24mod promql;
25mod region_query;
26pub mod standalone;
27
28use std::sync::Arc;
29use std::time::SystemTime;
30
31use async_trait::async_trait;
32use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
33use catalog::process_manager::ProcessManagerRef;
34use catalog::CatalogManagerRef;
35use client::OutputData;
36use common_base::cancellation::CancellableFuture;
37use common_base::Plugins;
38use common_config::KvBackendConfig;
39use common_error::ext::{BoxedError, ErrorExt};
40use common_meta::key::runtime_switch::RuntimeSwitchManager;
41use common_meta::key::TableMetadataManagerRef;
42use common_meta::kv_backend::KvBackendRef;
43use common_meta::state_store::KvStateStore;
44use common_procedure::local::{LocalManager, ManagerConfig};
45use common_procedure::options::ProcedureConfig;
46use common_procedure::ProcedureManagerRef;
47use common_query::Output;
48use common_telemetry::{debug, error, info, tracing};
49use datafusion_expr::LogicalPlan;
50use log_store::raft_engine::RaftEngineBackend;
51use operator::delete::DeleterRef;
52use operator::insert::InserterRef;
53use operator::statement::{StatementExecutor, StatementExecutorRef};
54use pipeline::pipeline_operator::PipelineOperator;
55use prometheus::HistogramTimer;
56use promql_parser::label::Matcher;
57use query::metrics::OnDone;
58use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
59use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
60use query::query_engine::DescribeResult;
61use query::QueryEngineRef;
62use servers::error as server_error;
63use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
64use servers::interceptor::{
65 PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
66};
67use servers::prometheus_handler::PrometheusHandler;
68use servers::query_handler::sql::SqlQueryHandler;
69use session::context::QueryContextRef;
70use session::table_name::table_idents_to_full_name;
71use snafu::prelude::*;
72use sql::dialect::Dialect;
73use sql::parser::{ParseOptions, ParserContext};
74use sql::statements::copy::{CopyDatabase, CopyTable};
75use sql::statements::statement::Statement;
76use sqlparser::ast::ObjectName;
77pub use standalone::StandaloneDatanodeManager;
78
79use crate::error::{
80 self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, InvalidSqlSnafu,
81 ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
82 TableOperationSnafu,
83};
84use crate::limiter::LimiterRef;
85use crate::slow_query_recorder::SlowQueryRecorder;
86use crate::stream_wrapper::CancellableStreamWrapper;
87
88#[derive(Clone)]
92pub struct Instance {
93 catalog_manager: CatalogManagerRef,
94 pipeline_operator: Arc<PipelineOperator>,
95 statement_executor: Arc<StatementExecutor>,
96 query_engine: QueryEngineRef,
97 plugins: Plugins,
98 inserter: InserterRef,
99 deleter: DeleterRef,
100 table_metadata_manager: TableMetadataManagerRef,
101 slow_query_recorder: Option<SlowQueryRecorder>,
102 limiter: Option<LimiterRef>,
103 process_manager: ProcessManagerRef,
104}
105
106impl Instance {
107 pub async fn try_build_standalone_components(
108 dir: String,
109 kv_backend_config: KvBackendConfig,
110 procedure_config: ProcedureConfig,
111 ) -> Result<(KvBackendRef, ProcedureManagerRef)> {
112 info!(
113 "Creating metadata kvbackend with config: {:?}",
114 kv_backend_config
115 );
116 let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
117 .map_err(BoxedError::new)
118 .context(error::OpenRaftEngineBackendSnafu)?;
119
120 let kv_backend = Arc::new(kv_backend);
121 let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
122
123 let manager_config = ManagerConfig {
124 max_retry_times: procedure_config.max_retry_times,
125 retry_delay: procedure_config.retry_delay,
126 max_running_procedures: procedure_config.max_running_procedures,
127 ..Default::default()
128 };
129 let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
130 let procedure_manager = Arc::new(LocalManager::new(
131 manager_config,
132 kv_state_store.clone(),
133 kv_state_store,
134 Some(runtime_switch_manager),
135 ));
136
137 Ok((kv_backend, procedure_manager))
138 }
139
140 pub fn catalog_manager(&self) -> &CatalogManagerRef {
141 &self.catalog_manager
142 }
143
144 pub fn query_engine(&self) -> &QueryEngineRef {
145 &self.query_engine
146 }
147
148 pub fn plugins(&self) -> &Plugins {
149 &self.plugins
150 }
151
152 pub fn statement_executor(&self) -> &StatementExecutorRef {
153 &self.statement_executor
154 }
155
156 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
157 &self.table_metadata_manager
158 }
159
160 pub fn inserter(&self) -> &InserterRef {
161 &self.inserter
162 }
163
164 pub fn process_manager(&self) -> &ProcessManagerRef {
165 &self.process_manager
166 }
167}
168
169fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result<Vec<Statement>> {
170 ParserContext::create_with_dialect(sql, dialect, ParseOptions::default()).context(ParseSqlSnafu)
171}
172
173impl Instance {
174 async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
175 check_permission(self.plugins.clone(), &stmt, &query_ctx)?;
176
177 let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
178 let query_interceptor = query_interceptor.as_ref();
179
180 let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
181 recorder.start(QueryStatement::Sql(stmt.clone()), query_ctx.clone())
182 } else {
183 None
184 };
185
186 let ticket = self.process_manager.register_query(
187 query_ctx.current_catalog().to_string(),
188 vec![query_ctx.current_schema()],
189 stmt.to_string(),
190 query_ctx.conn_info().to_string(),
191 Some(query_ctx.process_id()),
192 );
193
194 let query_fut = async {
195 match stmt {
196 Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
197 if let Statement::Explain(explain) = &stmt {
199 if let Some(format) = explain.format() {
200 query_ctx.set_explain_format(format.to_string());
201 }
202 }
203
204 let stmt = QueryStatement::Sql(stmt);
205 let plan = self
206 .statement_executor
207 .plan(&stmt, query_ctx.clone())
208 .await?;
209
210 let QueryStatement::Sql(stmt) = stmt else {
211 unreachable!()
212 };
213 query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
214 self.statement_executor
215 .exec_plan(plan, query_ctx)
216 .await
217 .context(TableOperationSnafu)
218 }
219 Statement::Tql(tql) => {
220 let plan = self
221 .statement_executor
222 .plan_tql(tql.clone(), &query_ctx)
223 .await?;
224
225 query_interceptor.pre_execute(
226 &Statement::Tql(tql),
227 Some(&plan),
228 query_ctx.clone(),
229 )?;
230 self.statement_executor
231 .exec_plan(plan, query_ctx)
232 .await
233 .context(TableOperationSnafu)
234 }
235 _ => {
236 query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
237 self.statement_executor
238 .execute_sql(stmt, query_ctx)
239 .await
240 .context(TableOperationSnafu)
241 }
242 }
243 };
244
245 CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
246 .await
247 .map_err(|_| error::CancelledSnafu.build())?
248 .map(|output| {
249 let Output { meta, data } = output;
250
251 let data = match data {
252 OutputData::Stream(stream) => {
253 OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
254 }
255 other => other,
256 };
257 Output { data, meta }
258 })
259 }
260}
261
262#[async_trait]
263impl SqlQueryHandler for Instance {
264 type Error = Error;
265
266 #[tracing::instrument(skip_all)]
267 async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
268 let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
269 let query_interceptor = query_interceptor_opt.as_ref();
270 let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
271 Ok(q) => q,
272 Err(e) => return vec![Err(e)],
273 };
274
275 let checker_ref = self.plugins.get::<PermissionCheckerRef>();
276 let checker = checker_ref.as_ref();
277
278 match parse_stmt(query.as_ref(), query_ctx.sql_dialect())
279 .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone()))
280 {
281 Ok(stmts) => {
282 let mut results = Vec::with_capacity(stmts.len());
283 for stmt in stmts {
284 if let Err(e) = checker
285 .check_permission(
286 query_ctx.current_user(),
287 PermissionReq::SqlStatement(&stmt),
288 )
289 .context(PermissionSnafu)
290 {
291 results.push(Err(e));
292 break;
293 }
294
295 match self.query_statement(stmt.clone(), query_ctx.clone()).await {
296 Ok(output) => {
297 let output_result =
298 query_interceptor.post_execute(output, query_ctx.clone());
299 results.push(output_result);
300 }
301 Err(e) => {
302 if e.status_code().should_log_error() {
303 error!(e; "Failed to execute query: {stmt}");
304 } else {
305 debug!("Failed to execute query: {stmt}, {e}");
306 }
307 results.push(Err(e));
308 break;
309 }
310 }
311 }
312 results
313 }
314 Err(e) => {
315 vec![Err(e)]
316 }
317 }
318 }
319
320 async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
321 self.query_engine
324 .execute(plan.clone(), query_ctx)
325 .await
326 .context(ExecLogicalPlanSnafu)
327 }
328
329 #[tracing::instrument(skip_all)]
330 async fn do_promql_query(
331 &self,
332 query: &PromQuery,
333 query_ctx: QueryContextRef,
334 ) -> Vec<Result<Output>> {
335 let result = PrometheusHandler::do_query(self, query, query_ctx)
337 .await
338 .with_context(|_| ExecutePromqlSnafu {
339 query: format!("{query:?}"),
340 });
341 vec![result]
342 }
343
344 async fn do_describe(
345 &self,
346 stmt: Statement,
347 query_ctx: QueryContextRef,
348 ) -> Result<Option<DescribeResult>> {
349 if matches!(
350 stmt,
351 Statement::Insert(_) | Statement::Query(_) | Statement::Delete(_)
352 ) {
353 self.plugins
354 .get::<PermissionCheckerRef>()
355 .as_ref()
356 .check_permission(query_ctx.current_user(), PermissionReq::SqlStatement(&stmt))
357 .context(PermissionSnafu)?;
358
359 let plan = self
360 .query_engine
361 .planner()
362 .plan(&QueryStatement::Sql(stmt), query_ctx.clone())
363 .await
364 .context(PlanStatementSnafu)?;
365 self.query_engine
366 .describe(plan, query_ctx)
367 .await
368 .map(Some)
369 .context(error::DescribeStatementSnafu)
370 } else {
371 Ok(None)
372 }
373 }
374
375 async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
376 self.catalog_manager
377 .schema_exists(catalog, schema, None)
378 .await
379 .context(error::CatalogSnafu)
380 }
381}
382
383pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
385 match output.data {
386 OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => output,
387 OutputData::Stream(stream) => {
388 let stream = OnDone::new(stream, move || {
389 timer.observe_duration();
390 });
391 Output::new(OutputData::Stream(Box::pin(stream)), output.meta)
392 }
393 }
394}
395
396#[async_trait]
397impl PrometheusHandler for Instance {
398 #[tracing::instrument(skip_all)]
399 async fn do_query(
400 &self,
401 query: &PromQuery,
402 query_ctx: QueryContextRef,
403 ) -> server_error::Result<Output> {
404 let interceptor = self
405 .plugins
406 .get::<PromQueryInterceptorRef<server_error::Error>>();
407
408 self.plugins
409 .get::<PermissionCheckerRef>()
410 .as_ref()
411 .check_permission(query_ctx.current_user(), PermissionReq::PromQuery)
412 .context(AuthSnafu)?;
413
414 let stmt = QueryLanguageParser::parse_promql(query, &query_ctx).with_context(|_| {
415 ParsePromQLSnafu {
416 query: query.clone(),
417 }
418 })?;
419
420 let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
421 recorder.start(stmt.clone(), query_ctx.clone())
422 } else {
423 None
424 };
425
426 let plan = self
427 .statement_executor
428 .plan(&stmt, query_ctx.clone())
429 .await
430 .map_err(BoxedError::new)
431 .context(ExecuteQuerySnafu)?;
432
433 interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
434
435 let output = self
436 .statement_executor
437 .exec_plan(plan, query_ctx.clone())
438 .await
439 .map_err(BoxedError::new)
440 .context(ExecuteQuerySnafu)?;
441
442 Ok(interceptor.post_execute(output, query_ctx)?)
443 }
444
445 async fn query_metric_names(
446 &self,
447 matchers: Vec<Matcher>,
448 ctx: &QueryContextRef,
449 ) -> server_error::Result<Vec<String>> {
450 self.handle_query_metric_names(matchers, ctx)
451 .await
452 .map_err(BoxedError::new)
453 .context(ExecuteQuerySnafu)
454 }
455
456 async fn query_label_values(
457 &self,
458 metric: String,
459 label_name: String,
460 matchers: Vec<Matcher>,
461 start: SystemTime,
462 end: SystemTime,
463 ctx: &QueryContextRef,
464 ) -> server_error::Result<Vec<String>> {
465 self.handle_query_label_values(metric, label_name, matchers, start, end, ctx)
466 .await
467 .map_err(BoxedError::new)
468 .context(ExecuteQuerySnafu)
469 }
470
471 fn catalog_manager(&self) -> CatalogManagerRef {
472 self.catalog_manager.clone()
473 }
474}
475
476macro_rules! validate_db_permission {
478 ($stmt: expr, $query_ctx: expr) => {
479 if let Some(database) = &$stmt.database {
480 validate_catalog_and_schema($query_ctx.current_catalog(), database, $query_ctx)
481 .map_err(BoxedError::new)
482 .context(SqlExecInterceptedSnafu)?;
483 }
484 };
485}
486
487pub fn check_permission(
488 plugins: Plugins,
489 stmt: &Statement,
490 query_ctx: &QueryContextRef,
491) -> Result<()> {
492 let need_validate = plugins
493 .get::<QueryOptions>()
494 .map(|opts| opts.disallow_cross_catalog_query)
495 .unwrap_or_default();
496
497 if !need_validate {
498 return Ok(());
499 }
500
501 match stmt {
502 Statement::Admin(_) => {}
505 Statement::Query(_)
507 | Statement::Explain(_)
508 | Statement::Tql(_)
509 | Statement::Delete(_)
510 | Statement::DeclareCursor(_)
511 | Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(_)) => {}
512 Statement::CreateDatabase(_)
514 | Statement::ShowDatabases(_)
515 | Statement::DropDatabase(_)
516 | Statement::AlterDatabase(_)
517 | Statement::DropFlow(_)
518 | Statement::Use(_) => {}
519 #[cfg(feature = "enterprise")]
520 Statement::DropTrigger(_) => {}
521 Statement::ShowCreateDatabase(stmt) => {
522 validate_database(&stmt.database_name, query_ctx)?;
523 }
524 Statement::ShowCreateTable(stmt) => {
525 validate_param(&stmt.table_name, query_ctx)?;
526 }
527 Statement::ShowCreateFlow(stmt) => {
528 validate_param(&stmt.flow_name, query_ctx)?;
529 }
530 Statement::ShowCreateView(stmt) => {
531 validate_param(&stmt.view_name, query_ctx)?;
532 }
533 Statement::CreateExternalTable(stmt) => {
534 validate_param(&stmt.name, query_ctx)?;
535 }
536 Statement::CreateFlow(stmt) => {
537 validate_param(&stmt.sink_table_name, query_ctx)?;
539 }
540 #[cfg(feature = "enterprise")]
541 Statement::CreateTrigger(stmt) => {
542 validate_param(&stmt.trigger_name, query_ctx)?;
543 }
544 Statement::CreateView(stmt) => {
545 validate_param(&stmt.name, query_ctx)?;
546 }
547 Statement::AlterTable(stmt) => {
548 validate_param(stmt.table_name(), query_ctx)?;
549 }
550 Statement::SetVariables(_) | Statement::ShowVariables(_) => {}
552 Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
554
555 Statement::Insert(insert) => {
556 let name = insert.table_name().context(ParseSqlSnafu)?;
557 validate_param(name, query_ctx)?;
558 }
559 Statement::CreateTable(stmt) => {
560 validate_param(&stmt.name, query_ctx)?;
561 }
562 Statement::CreateTableLike(stmt) => {
563 validate_param(&stmt.table_name, query_ctx)?;
564 validate_param(&stmt.source_name, query_ctx)?;
565 }
566 Statement::DropTable(drop_stmt) => {
567 for table_name in drop_stmt.table_names() {
568 validate_param(table_name, query_ctx)?;
569 }
570 }
571 Statement::DropView(stmt) => {
572 validate_param(&stmt.view_name, query_ctx)?;
573 }
574 Statement::ShowTables(stmt) => {
575 validate_db_permission!(stmt, query_ctx);
576 }
577 Statement::ShowTableStatus(stmt) => {
578 validate_db_permission!(stmt, query_ctx);
579 }
580 Statement::ShowColumns(stmt) => {
581 validate_db_permission!(stmt, query_ctx);
582 }
583 Statement::ShowIndex(stmt) => {
584 validate_db_permission!(stmt, query_ctx);
585 }
586 Statement::ShowRegion(stmt) => {
587 validate_db_permission!(stmt, query_ctx);
588 }
589 Statement::ShowViews(stmt) => {
590 validate_db_permission!(stmt, query_ctx);
591 }
592 Statement::ShowFlows(stmt) => {
593 validate_db_permission!(stmt, query_ctx);
594 }
595 #[cfg(feature = "enterprise")]
596 Statement::ShowTriggers(_stmt) => {
597 }
600 Statement::ShowStatus(_stmt) => {}
601 Statement::ShowSearchPath(_stmt) => {}
602 Statement::DescribeTable(stmt) => {
603 validate_param(stmt.name(), query_ctx)?;
604 }
605 Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
606 CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?,
607 CopyTable::From(copy_table_from) => {
608 validate_param(©_table_from.table_name, query_ctx)?
609 }
610 },
611 Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
612 match copy_database {
613 CopyDatabase::To(stmt) => validate_database(&stmt.database_name, query_ctx)?,
614 CopyDatabase::From(stmt) => validate_database(&stmt.database_name, query_ctx)?,
615 }
616 }
617 Statement::TruncateTable(stmt) => {
618 validate_param(stmt.table_name(), query_ctx)?;
619 }
620 Statement::FetchCursor(_) | Statement::CloseCursor(_) => {}
622 Statement::Kill(_) => {}
624 Statement::ShowProcesslist(_) => {}
626 }
627 Ok(())
628}
629
630fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
631 let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
632 .map_err(BoxedError::new)
633 .context(ExternalSnafu)?;
634
635 validate_catalog_and_schema(&catalog, &schema, query_ctx)
636 .map_err(BoxedError::new)
637 .context(SqlExecInterceptedSnafu)
638}
639
640fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
641 let (catalog, schema) = match &name.0[..] {
642 [schema] => (
643 query_ctx.current_catalog().to_string(),
644 schema.value.clone(),
645 ),
646 [catalog, schema] => (catalog.value.clone(), schema.value.clone()),
647 _ => InvalidSqlSnafu {
648 err_msg: format!(
649 "expect database name to be <catalog>.<schema> or <schema>, actual: {name}",
650 ),
651 }
652 .fail()?,
653 };
654
655 validate_catalog_and_schema(&catalog, &schema, query_ctx)
656 .map_err(BoxedError::new)
657 .context(SqlExecInterceptedSnafu)
658}
659
660#[cfg(test)]
661mod tests {
662 use std::collections::HashMap;
663
664 use common_base::Plugins;
665 use query::query_engine::options::QueryOptions;
666 use session::context::QueryContext;
667 use sql::dialect::GreptimeDbDialect;
668 use strfmt::Format;
669
670 use super::*;
671
672 #[test]
673 fn test_exec_validation() {
674 let query_ctx = QueryContext::arc();
675 let plugins: Plugins = Plugins::new();
676 plugins.insert(QueryOptions {
677 disallow_cross_catalog_query: true,
678 });
679
680 let sql = r#"
681 SELECT * FROM demo;
682 EXPLAIN SELECT * FROM demo;
683 CREATE DATABASE test_database;
684 SHOW DATABASES;
685 "#;
686 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
687 assert_eq!(stmts.len(), 4);
688 for stmt in stmts {
689 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
690 re.unwrap();
691 }
692
693 let sql = r#"
694 SHOW CREATE TABLE demo;
695 ALTER TABLE demo ADD COLUMN new_col INT;
696 "#;
697 let stmts = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
698 assert_eq!(stmts.len(), 2);
699 for stmt in stmts {
700 let re = check_permission(plugins.clone(), &stmt, &query_ctx);
701 re.unwrap();
702 }
703
704 fn replace_test(template_sql: &str, plugins: Plugins, query_ctx: &QueryContextRef) {
705 let right = vec![("", ""), ("", "public."), ("greptime.", "public.")];
707 for (catalog, schema) in right {
708 let sql = do_fmt(template_sql, catalog, schema);
709 do_test(&sql, plugins.clone(), query_ctx, true);
710 }
711
712 let wrong = vec![
713 ("wrongcatalog.", "public."),
714 ("wrongcatalog.", "wrongschema."),
715 ];
716 for (catalog, schema) in wrong {
717 let sql = do_fmt(template_sql, catalog, schema);
718 do_test(&sql, plugins.clone(), query_ctx, false);
719 }
720 }
721
722 fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
723 let vars = HashMap::from([
724 ("catalog".to_string(), catalog),
725 ("schema".to_string(), schema),
726 ]);
727 template.format(&vars).unwrap()
728 }
729
730 fn do_test(sql: &str, plugins: Plugins, query_ctx: &QueryContextRef, is_ok: bool) {
731 let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
732 let re = check_permission(plugins, stmt, query_ctx);
733 if is_ok {
734 re.unwrap();
735 } else {
736 assert!(re.is_err());
737 }
738 }
739
740 let sql = "INSERT INTO {catalog}{schema}monitor(host) VALUES ('host1');";
742 replace_test(sql, plugins.clone(), &query_ctx);
743
744 let sql = r#"CREATE TABLE {catalog}{schema}demo(
746 host STRING,
747 ts TIMESTAMP,
748 TIME INDEX (ts),
749 PRIMARY KEY(host)
750 ) engine=mito;"#;
751 replace_test(sql, plugins.clone(), &query_ctx);
752
753 let sql = "DROP TABLE {catalog}{schema}demo;";
755 replace_test(sql, plugins.clone(), &query_ctx);
756
757 let sql = "SHOW TABLES FROM public";
759 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
760 check_permission(plugins.clone(), &stmt[0], &query_ctx).unwrap();
761
762 let sql = "SHOW TABLES FROM private";
763 let stmt = parse_stmt(sql, &GreptimeDbDialect {}).unwrap();
764 let re = check_permission(plugins.clone(), &stmt[0], &query_ctx);
765 assert!(re.is_ok());
766
767 let sql = "DESC TABLE {catalog}{schema}demo;";
769 replace_test(sql, plugins, &query_ctx);
770 }
771}