#[cfg(test)]
mod tests {
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use api::v1::region::QueryRequest;
use client::OutputData;
use common_base::Plugins;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::table_name::TableNameKey;
use common_meta::rpc::router::region_distribution;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
use datafusion_expr::LogicalPlan;
use frontend::error::{self, Error, Result};
use frontend::instance::Instance;
use query::parser::QueryLanguageParser;
use query::query_engine::DefaultSerializer;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use sql::statements::statement::Statement;
use store_api::storage::RegionId;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use crate::standalone::GreptimeDbStandaloneBuilder;
use crate::tests;
use crate::tests::MockDistributedInstance;
#[tokio::test(flavor = "multi_thread")]
async fn test_standalone_exec_sql() {
let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_exec_sql")
.build()
.await;
let instance = standalone.fe_instance();
let sql = r#"
CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito"#;
create_table(instance, sql).await;
insert_and_query(instance).await;
drop_table(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_exec_sql() {
common_telemetry::init_default_ut_logging();
let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await;
let frontend = distributed.frontend();
let instance = frontend.as_ref();
let sql = r#"
CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
)
PARTITION ON COLUMNS (host) (
host < '550-A',
host >= '550-A' AND host < '550-W',
host >= '550-W' AND host < 'MOSS',
host >= 'MOSS'
)
engine=mito"#;
create_table(instance, sql).await;
insert_and_query(instance).await;
verify_data_distribution(
&distributed,
HashMap::from([
(
0u32,
"\
+---------------------+------+
| ts | host |
+---------------------+------+
| 2013-12-31T16:00:00 | 490 |
+---------------------+------+",
),
(
1u32,
"\
+---------------------+-------+
| ts | host |
+---------------------+-------+
| 2022-12-31T16:00:00 | 550-A |
+---------------------+-------+",
),
(
2u32,
"\
+---------------------+-------+
| ts | host |
+---------------------+-------+
| 2023-12-31T16:00:00 | 550-W |
+---------------------+-------+",
),
(
3u32,
"\
+---------------------+------+
| ts | host |
+---------------------+------+
| 2043-12-31T16:00:00 | MOSS |
+---------------------+------+",
),
]),
)
.await;
drop_table(instance).await;
verify_table_is_dropped(&distributed).await;
}
async fn query(instance: &Instance, sql: &str) -> Output {
SqlQueryHandler::do_query(instance, sql, QueryContext::arc())
.await
.remove(0)
.unwrap()
}
async fn create_table(instance: &Instance, sql: &str) {
let output = query(instance, sql).await;
let OutputData::AffectedRows(x) = output.data else {
unreachable!()
};
assert_eq!(x, 0);
}
async fn insert_and_query(instance: &Instance) {
let sql = r#"INSERT INTO demo(host, cpu, memory, ts) VALUES
('490', 0.1, 1, 1388505600000),
('550-A', 1, 100, 1672502400000),
('550-W', 10000, 1000000, 1704038400000),
('MOSS', 100000000, 10000000000, 2335190400000)
"#;
let output = query(instance, sql).await;
let OutputData::AffectedRows(x) = output.data else {
unreachable!()
};
assert_eq!(x, 4);
let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; let output = query(instance, sql).await;
let OutputData::Stream(s) = output.data else {
unreachable!()
};
let batches = common_recordbatch::util::collect_batches(s).await.unwrap();
let pretty_print = batches.pretty_print().unwrap();
let expected = "\
+-------+---------------------+-------------+---------------+-----------+
| host | ts | cpu | memory | disk_util |
+-------+---------------------+-------------+---------------+-----------+
| 490 | 2013-12-31T16:00:00 | 0.1 | 1.0 | 9.9 |
| 550-A | 2022-12-31T16:00:00 | 1.0 | 100.0 | 9.9 |
| 550-W | 2023-12-31T16:00:00 | 10000.0 | 1000000.0 | 9.9 |
| MOSS | 2043-12-31T16:00:00 | 100000000.0 | 10000000000.0 | 9.9 |
+-------+---------------------+-------------+---------------+-----------+";
assert_eq!(pretty_print, expected);
}
async fn verify_data_distribution(
instance: &MockDistributedInstance,
expected_distribution: HashMap<u32, &str>,
) {
let manager = instance.table_metadata_manager();
let table_id = manager
.table_name_manager()
.get(TableNameKey::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"demo",
))
.await
.unwrap()
.unwrap()
.table_id();
debug!("Reading table {table_id}");
let table_route_value = manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await
.unwrap()
.unwrap();
let region_to_dn_map = region_distribution(
table_route_value
.region_routes()
.expect("region routes should be physical"),
)
.iter()
.map(|(k, v)| (v[0], *k))
.collect::<HashMap<u32, u64>>();
assert!(region_to_dn_map.len() <= instance.datanodes().len());
let stmt = QueryLanguageParser::parse_sql(
"SELECT ts, host FROM demo ORDER BY ts",
&QueryContext::arc(),
)
.unwrap();
let plan = instance
.frontend()
.statement_executor()
.plan(&stmt, QueryContext::arc())
.await
.unwrap();
let plan = DFLogicalSubstraitConvertor
.encode(&plan, DefaultSerializer)
.unwrap();
for (region, dn) in region_to_dn_map.iter() {
let region_server = instance.datanodes().get(dn).unwrap().region_server();
let region_id = RegionId::new(table_id, *region);
let stream = region_server
.handle_remote_read(QueryRequest {
region_id: region_id.as_u64(),
plan: plan.to_vec(),
..Default::default()
})
.await
.unwrap();
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let actual = recordbatches.pretty_print().unwrap();
let expected = expected_distribution.get(region).unwrap();
assert_eq!(&actual, expected);
}
}
async fn drop_table(instance: &Instance) {
let sql = "DROP TABLE demo";
let output = query(instance, sql).await;
let OutputData::AffectedRows(x) = output.data else {
unreachable!()
};
assert_eq!(x, 0);
}
async fn verify_table_is_dropped(instance: &MockDistributedInstance) {
assert!(instance
.frontend()
.catalog_manager()
.table("greptime", "public", "demo", None)
.await
.unwrap()
.is_none())
}
#[tokio::test(flavor = "multi_thread")]
async fn test_sql_interceptor_plugin() {
#[derive(Default)]
struct AssertionHook {
pub(crate) c: AtomicU32,
}
impl SqlQueryInterceptor for AssertionHook {
type Error = Error;
fn pre_parsing<'a>(
&self,
query: &'a str,
_query_ctx: QueryContextRef,
) -> Result<Cow<'a, str>> {
let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert!(query.starts_with("CREATE TABLE demo"));
Ok(Cow::Borrowed(query))
}
fn post_parsing(
&self,
statements: Vec<Statement>,
_query_ctx: QueryContextRef,
) -> Result<Vec<Statement>> {
let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert!(matches!(statements[0], Statement::CreateTable(_)));
Ok(statements)
}
fn pre_execute(
&self,
_statement: &Statement,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<()> {
let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(())
}
fn post_execute(
&self,
mut output: Output,
_query_ctx: QueryContextRef,
) -> Result<Output> {
let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
match &mut output.data {
OutputData::AffectedRows(rows) => {
assert_eq!(*rows, 0);
*rows = 10;
}
_ => unreachable!(),
}
Ok(output)
}
}
let plugins = Plugins::new();
let counter_hook = Arc::new(AssertionHook::default());
plugins.insert::<SqlQueryInterceptorRef<Error>>(counter_hook.clone());
let standalone = GreptimeDbStandaloneBuilder::new("test_sql_interceptor_plugin")
.with_plugin(plugins)
.build()
.await;
let instance = standalone.fe_instance().clone();
let sql = r#"CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito;"#;
let output = SqlQueryHandler::do_query(&*instance, sql, QueryContext::arc())
.await
.remove(0)
.unwrap();
assert_eq!(4, counter_hook.c.load(std::sync::atomic::Ordering::Relaxed));
match output.data {
OutputData::AffectedRows(rows) => assert_eq!(rows, 10),
_ => unreachable!(),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_disable_db_operation_plugin() {
#[derive(Default)]
struct DisableDBOpHook;
impl SqlQueryInterceptor for DisableDBOpHook {
type Error = Error;
fn post_parsing(
&self,
statements: Vec<Statement>,
_query_ctx: QueryContextRef,
) -> Result<Vec<Statement>> {
for s in &statements {
match s {
Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => {
return Err(Error::NotSupported {
feat: "Database operations".to_owned(),
})
}
_ => {}
}
}
Ok(statements)
}
}
let query_ctx = QueryContext::arc();
let plugins = Plugins::new();
let hook = Arc::new(DisableDBOpHook);
plugins.insert::<SqlQueryInterceptorRef<Error>>(hook.clone());
let standalone = GreptimeDbStandaloneBuilder::new("test_disable_db_operation_plugin")
.with_plugin(plugins)
.build()
.await;
let instance = standalone.fe_instance().clone();
let sql = r#"CREATE TABLE demo(
host STRING,
ts TIMESTAMP,
cpu DOUBLE NULL,
memory DOUBLE NULL,
disk_util DOUBLE DEFAULT 9.9,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito;"#;
let output = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
.unwrap();
match output.data {
OutputData::AffectedRows(rows) => assert_eq!(rows, 0),
_ => unreachable!(),
}
let sql = r#"CREATE DATABASE tomcat"#;
if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
{
assert!(matches!(e, error::Error::NotSupported { .. }));
} else {
unreachable!();
}
let sql = r#"SELECT 1; SHOW DATABASES"#;
if let Err(e) = SqlQueryHandler::do_query(&*instance, sql, query_ctx.clone())
.await
.remove(0)
{
assert!(matches!(e, error::Error::NotSupported { .. }));
} else {
unreachable!();
}
}
}