mod show_create_table;
use std::collections::HashMap;
use std::sync::Arc;
use catalog::information_schema::{
columns, flows, key_column_usage, schemata, tables, CHARACTER_SETS, COLLATIONS, COLUMNS, FLOWS,
KEY_COLUMN_USAGE, SCHEMATA, TABLES, VIEWS,
};
use catalog::CatalogManagerRef;
use common_catalog::consts::{
INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
SEMANTIC_TYPE_TIME_INDEX,
};
use common_catalog::format_full_table_name;
use common_datasource::file_format::{infer_schemas, FileFormat, Format};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_datasource::util::find_dir_and_filename;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_meta::SchemaOptions;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::Output;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::RecordBatches;
use common_time::timezone::get_timezone;
use common_time::Timestamp;
use datafusion::common::ScalarValue;
use datafusion::prelude::SessionContext;
use datafusion_expr::{case, col, lit, Expr};
use datatypes::prelude::*;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
use datatypes::vectors::StringVector;
use object_store::ObjectStore;
use once_cell::sync::Lazy;
use regex::Regex;
use session::context::QueryContextRef;
pub use show_create_table::create_table_stmt;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::Ident;
use sql::parser::ParserContext;
use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions};
use sql::statements::show::{
ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowTableStatus, ShowTables,
ShowVariables, ShowViews,
};
use sql::statements::statement::Statement;
use sql::statements::OptionMap;
use sqlparser::ast::ObjectName;
use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
use table::TableRef;
use crate::dataframe::DataFrame;
use crate::error::{self, Result, UnsupportedVariableSnafu};
use crate::planner::DfLogicalPlanner;
use crate::QueryEngineRef;
const SCHEMAS_COLUMN: &str = "Database";
const OPTIONS_COLUMN: &str = "Options";
const TABLES_COLUMN: &str = "Tables";
const VIEWS_COLUMN: &str = "Views";
const FLOWS_COLUMN: &str = "Flows";
const FIELD_COLUMN: &str = "Field";
const TABLE_TYPE_COLUMN: &str = "Table_type";
const COLUMN_NAME_COLUMN: &str = "Column";
const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
const COLUMN_TYPE_COLUMN: &str = "Type";
const COLUMN_KEY_COLUMN: &str = "Key";
const COLUMN_EXTRA_COLUMN: &str = "Extra";
const COLUMN_PRIVILEGES_COLUMN: &str = "Privileges";
const COLUMN_COLLATION_COLUMN: &str = "Collation";
const COLUMN_NULLABLE_COLUMN: &str = "Null";
const COLUMN_DEFAULT_COLUMN: &str = "Default";
const COLUMN_COMMENT_COLUMN: &str = "Comment";
const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
const YES_STR: &str = "YES";
const NO_STR: &str = "NO";
const PRI_KEY: &str = "PRI";
const TIME_INDEX: &str = "TIME INDEX";
const INDEX_TABLE_COLUMN: &str = "Table";
const INDEX_NONT_UNIQUE_COLUMN: &str = "Non_unique";
const INDEX_CARDINALITY_COLUMN: &str = "Cardinality";
const INDEX_SUB_PART_COLUMN: &str = "Sub_part";
const INDEX_PACKED_COLUMN: &str = "Packed";
const INDEX_INDEX_TYPE_COLUMN: &str = "Index_type";
const INDEX_COMMENT_COLUMN: &str = "Index_comment";
const INDEX_VISIBLE_COLUMN: &str = "Visible";
const INDEX_EXPRESSION_COLUMN: &str = "Expression";
const INDEX_KEY_NAME_COLUMN: &str = "Key_name";
const INDEX_SEQ_IN_INDEX_COLUMN: &str = "Seq_in_index";
const INDEX_COLUMN_NAME_COLUMN: &str = "Column_name";
static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
Arc::new(Schema::new(vec![
ColumnSchema::new(
COLUMN_NAME_COLUMN,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
COLUMN_TYPE_COLUMN,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(COLUMN_KEY_COLUMN, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(
COLUMN_NULLABLE_COLUMN,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
COLUMN_DEFAULT_COLUMN,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
COLUMN_SEMANTIC_TYPE_COLUMN,
ConcreteDataType::string_datatype(),
false,
),
]))
});
static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
Arc::new(Schema::new(vec![
ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"Create Database",
ConcreteDataType::string_datatype(),
false,
),
]))
});
static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
Arc::new(Schema::new(vec![
ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("Create Table", ConcreteDataType::string_datatype(), false),
]))
});
static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
Arc::new(Schema::new(vec![
ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
]))
});
static SHOW_CREATE_VIEW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
Arc::new(Schema::new(vec![
ColumnSchema::new("View", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("Create View", ConcreteDataType::string_datatype(), false),
]))
});
fn null() -> Expr {
lit(ScalarValue::Null)
}
pub async fn show_databases(
stmt: ShowDatabases,
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let projects = if stmt.full {
vec![
(schemata::SCHEMA_NAME, SCHEMAS_COLUMN),
(schemata::SCHEMA_OPTS, OPTIONS_COLUMN),
]
} else {
vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)]
};
let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
let like_field = Some(schemata::SCHEMA_NAME);
let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
query_from_information_schema_table(
query_engine,
catalog_manager,
query_ctx,
SCHEMATA,
vec![],
projects,
filters,
like_field,
sort,
stmt.kind,
)
.await
}
#[allow(clippy::too_many_arguments)]
async fn query_from_information_schema_table(
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
table_name: &str,
select: Vec<Expr>,
projects: Vec<(&str, &str)>,
filters: Vec<Expr>,
like_field: Option<&str>,
sort: Vec<Expr>,
kind: ShowKind,
) -> Result<Output> {
let table = catalog_manager
.table(
query_ctx.current_catalog(),
INFORMATION_SCHEMA_NAME,
table_name,
Some(&query_ctx),
)
.await
.context(error::CatalogSnafu)?
.with_context(|| error::TableNotFoundSnafu {
table: format_full_table_name(
query_ctx.current_catalog(),
INFORMATION_SCHEMA_NAME,
table_name,
),
})?;
let DataFrame::DataFusion(dataframe) = query_engine.read_table(table)?;
let dataframe = if select.is_empty() {
dataframe
} else {
dataframe.select(select).context(error::PlanSqlSnafu)?
};
let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
df.filter(expr).context(error::PlanSqlSnafu)
})?;
let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
dataframe
.filter(col(field).like(lit(ident.value.clone())))
.context(error::PlanSqlSnafu)?
} else {
dataframe
};
let dataframe = dataframe
.sort(sort)
.context(error::PlanSqlSnafu)?
.select_columns(&projects.iter().map(|(c, _)| *c).collect::<Vec<_>>())
.context(error::PlanSqlSnafu)?;
let dataframe = projects
.into_iter()
.try_fold(dataframe, |df, (column, renamed_column)| {
df.with_column_renamed(column, renamed_column)
.context(error::PlanSqlSnafu)
})?;
let dataframe = match kind {
ShowKind::All | ShowKind::Like(_) => {
dataframe
}
ShowKind::Where(filter) => {
let view = dataframe.into_view();
let dataframe = SessionContext::new_with_state(
query_engine
.engine_context(query_ctx.clone())
.state()
.clone(),
)
.read_table(view)
.context(error::DataFusionSnafu)?;
let planner = query_engine.planner();
let planner = planner
.as_any()
.downcast_ref::<DfLogicalPlanner>()
.expect("Must be the datafusion planner");
let filter = planner
.sql_to_expr(filter, dataframe.schema(), false, query_ctx)
.await?;
dataframe.filter(filter).context(error::PlanSqlSnafu)?
}
};
let stream = dataframe
.execute_stream()
.await
.context(error::DataFusionSnafu)?;
Ok(Output::new_with_stream(Box::pin(
RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
)))
}
pub async fn show_columns(
stmt: ShowColumns,
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let schema_name = if let Some(database) = stmt.database {
database
} else {
query_ctx.current_schema()
};
let projects = if stmt.full {
vec![
(columns::COLUMN_NAME, FIELD_COLUMN),
(columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
(columns::COLLATION_NAME, COLUMN_COLLATION_COLUMN),
(columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
(columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
(columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
(columns::COLUMN_COMMENT, COLUMN_COMMENT_COLUMN),
(columns::PRIVILEGES, COLUMN_PRIVILEGES_COLUMN),
(columns::EXTRA, COLUMN_EXTRA_COLUMN),
(columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
]
} else {
vec![
(columns::COLUMN_NAME, FIELD_COLUMN),
(columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
(columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
(columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
(columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
(columns::EXTRA, COLUMN_EXTRA_COLUMN),
(columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
]
};
let filters = vec![
col(columns::TABLE_NAME).eq(lit(&stmt.table)),
col(columns::TABLE_SCHEMA).eq(lit(schema_name.clone())),
col(columns::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
];
let like_field = Some(columns::COLUMN_NAME);
let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
query_from_information_schema_table(
query_engine,
catalog_manager,
query_ctx,
COLUMNS,
vec![],
projects,
filters,
like_field,
sort,
stmt.kind,
)
.await
}
pub async fn show_index(
stmt: ShowIndex,
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let schema_name = if let Some(database) = stmt.database {
database
} else {
query_ctx.current_schema()
};
let select = vec![
lit(1).alias(INDEX_NONT_UNIQUE_COLUMN),
lit("A").alias(COLUMN_COLLATION_COLUMN),
null().alias(INDEX_CARDINALITY_COLUMN),
null().alias(INDEX_SUB_PART_COLUMN),
null().alias(INDEX_PACKED_COLUMN),
case(col(key_column_usage::CONSTRAINT_NAME))
.when(lit(TIME_INDEX), lit(NO_STR))
.otherwise(lit(YES_STR))
.context(error::PlanSqlSnafu)?
.alias(COLUMN_NULLABLE_COLUMN),
lit("greptime-inverted-index-v1").alias(INDEX_INDEX_TYPE_COLUMN),
lit("").alias(COLUMN_COMMENT_COLUMN),
lit("").alias(INDEX_COMMENT_COLUMN),
lit(YES_STR).alias(INDEX_VISIBLE_COLUMN),
null().alias(INDEX_EXPRESSION_COLUMN),
Expr::Wildcard { qualifier: None },
];
let projects = vec![
(key_column_usage::TABLE_NAME, INDEX_TABLE_COLUMN),
(INDEX_NONT_UNIQUE_COLUMN, INDEX_NONT_UNIQUE_COLUMN),
(key_column_usage::CONSTRAINT_NAME, INDEX_KEY_NAME_COLUMN),
(
key_column_usage::ORDINAL_POSITION,
INDEX_SEQ_IN_INDEX_COLUMN,
),
(key_column_usage::COLUMN_NAME, INDEX_COLUMN_NAME_COLUMN),
(COLUMN_COLLATION_COLUMN, COLUMN_COLLATION_COLUMN),
(INDEX_CARDINALITY_COLUMN, INDEX_CARDINALITY_COLUMN),
(INDEX_SUB_PART_COLUMN, INDEX_SUB_PART_COLUMN),
(INDEX_PACKED_COLUMN, INDEX_PACKED_COLUMN),
(COLUMN_NULLABLE_COLUMN, COLUMN_NULLABLE_COLUMN),
(INDEX_INDEX_TYPE_COLUMN, INDEX_INDEX_TYPE_COLUMN),
(COLUMN_COMMENT_COLUMN, COLUMN_COMMENT_COLUMN),
(INDEX_COMMENT_COLUMN, INDEX_COMMENT_COLUMN),
(INDEX_VISIBLE_COLUMN, INDEX_VISIBLE_COLUMN),
(INDEX_EXPRESSION_COLUMN, INDEX_EXPRESSION_COLUMN),
];
let filters = vec![
col(key_column_usage::TABLE_NAME).eq(lit(&stmt.table)),
col(key_column_usage::TABLE_SCHEMA).eq(lit(schema_name.clone())),
col(key_column_usage::REAL_TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
];
let like_field = None;
let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
query_from_information_schema_table(
query_engine,
catalog_manager,
query_ctx,
KEY_COLUMN_USAGE,
select,
projects,
filters,
like_field,
sort,
stmt.kind,
)
.await
}
pub async fn show_tables(
stmt: ShowTables,
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let schema_name = if let Some(database) = stmt.database {
database
} else {
query_ctx.current_schema()
};
let projects = if stmt.full {
vec![
(tables::TABLE_NAME, TABLES_COLUMN),
(tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
]
} else {
vec![(tables::TABLE_NAME, TABLES_COLUMN)]
};
let filters = vec![
col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
];
let like_field = Some(tables::TABLE_NAME);
let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
query_from_information_schema_table(
query_engine,
catalog_manager,
query_ctx,
TABLES,
vec![],
projects,
filters,
like_field,
sort,
stmt.kind,
)
.await
}
pub async fn show_table_status(
stmt: ShowTableStatus,
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let schema_name = if let Some(database) = stmt.database {
database
} else {
query_ctx.current_schema()
};
let projects = vec![
(tables::TABLE_NAME, "Name"),
(tables::ENGINE, "Engine"),
(tables::VERSION, "Version"),
(tables::ROW_FORMAT, "Row_format"),
(tables::TABLE_ROWS, "Rows"),
(tables::AVG_ROW_LENGTH, "Avg_row_length"),
(tables::DATA_LENGTH, "Data_length"),
(tables::MAX_DATA_LENGTH, "Max_data_length"),
(tables::INDEX_LENGTH, "Index_length"),
(tables::DATA_FREE, "Data_free"),
(tables::AUTO_INCREMENT, "Auto_increment"),
(tables::CREATE_TIME, "Create_time"),
(tables::UPDATE_TIME, "Update_time"),
(tables::CHECK_TIME, "Check_time"),
(tables::TABLE_COLLATION, "Collation"),
(tables::CHECKSUM, "Checksum"),
(tables::CREATE_OPTIONS, "Create_options"),
(tables::TABLE_COMMENT, "Comment"),
];
let filters = vec![
col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
];
let like_field = Some(tables::TABLE_NAME);
let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
query_from_information_schema_table(
query_engine,
catalog_manager,
query_ctx,
TABLES,
vec![],
projects,
filters,
like_field,
sort,
stmt.kind,
)
.await
}
pub async fn show_collations(
kind: ShowKind,
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let projects = vec![
("collation_name", "Collation"),
("character_set_name", "Charset"),
("id", "Id"),
("is_default", "Default"),
("is_compiled", "Compiled"),
("sortlen", "Sortlen"),
];
let filters = vec![];
let like_field = Some("collation_name");
let sort = vec![];
query_from_information_schema_table(
query_engine,
catalog_manager,
query_ctx,
COLLATIONS,
vec![],
projects,
filters,
like_field,
sort,
kind,
)
.await
}
pub async fn show_charsets(
kind: ShowKind,
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let projects = vec![
("character_set_name", "Charset"),
("description", "Description"),
("default_collate_name", "Default collation"),
("maxlen", "Maxlen"),
];
let filters = vec![];
let like_field = Some("character_set_name");
let sort = vec![];
query_from_information_schema_table(
query_engine,
catalog_manager,
query_ctx,
CHARACTER_SETS,
vec![],
projects,
filters,
like_field,
sort,
kind,
)
.await
}
pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
let variable = stmt.variable.to_string().to_uppercase();
let value = match variable.as_str() {
"SYSTEM_TIME_ZONE" | "SYSTEM_TIMEZONE" => get_timezone(None).to_string(),
"TIME_ZONE" | "TIMEZONE" => query_ctx.timezone().to_string(),
"DATESTYLE" => {
let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
format!("{}, {}", style, order)
}
_ => return UnsupportedVariableSnafu { name: variable }.fail(),
};
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
variable,
ConcreteDataType::string_datatype(),
false,
)]));
let records = RecordBatches::try_from_columns(
schema,
vec![Arc::new(StringVector::from(vec![value])) as _],
)
.context(error::CreateRecordBatchSnafu)?;
Ok(Output::new_with_record_batches(records))
}
pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
]));
let records = RecordBatches::try_from_columns(
schema,
vec![
Arc::new(StringVector::from(Vec::<&str>::new())) as _,
Arc::new(StringVector::from(Vec::<&str>::new())) as _,
],
)
.context(error::CreateRecordBatchSnafu)?;
Ok(Output::new_with_record_batches(records))
}
pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
let stmt = CreateDatabase {
name: ObjectName(vec![Ident {
value: database_name.to_string(),
quote_style: None,
}]),
if_not_exists: true,
options,
};
let sql = format!("{stmt}");
let columns = vec![
Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
Arc::new(StringVector::from(vec![sql])) as _,
];
let records =
RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
.context(error::CreateRecordBatchSnafu)?;
Ok(Output::new_with_record_batches(records))
}
pub fn show_create_table(
table: TableRef,
schema_options: Option<SchemaOptions>,
partitions: Option<Partitions>,
query_ctx: QueryContextRef,
) -> Result<Output> {
let table_info = table.table_info();
let table_name = &table_info.name;
let quote_style = query_ctx.quote_style();
let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
stmt.partitions = partitions.map(|mut p| {
p.set_quote(quote_style);
p
});
let sql = format!("{}", stmt);
let columns = vec![
Arc::new(StringVector::from(vec![table_name.clone()])) as _,
Arc::new(StringVector::from(vec![sql])) as _,
];
let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
.context(error::CreateRecordBatchSnafu)?;
Ok(Output::new_with_record_batches(records))
}
pub fn show_create_view(
view_name: ObjectName,
definition: &str,
query_ctx: QueryContextRef,
) -> Result<Output> {
let mut parser_ctx =
ParserContext::new(query_ctx.sql_dialect(), definition).context(error::SqlSnafu)?;
let Statement::CreateView(create_view) =
parser_ctx.parse_statement().context(error::SqlSnafu)?
else {
unreachable!();
};
let stmt = CreateView {
name: view_name.clone(),
columns: create_view.columns,
query: create_view.query,
or_replace: create_view.or_replace,
if_not_exists: create_view.if_not_exists,
};
let sql = format!("{}", stmt);
let columns = vec![
Arc::new(StringVector::from(vec![view_name.to_string()])) as _,
Arc::new(StringVector::from(vec![sql])) as _,
];
let records = RecordBatches::try_from_columns(SHOW_CREATE_VIEW_OUTPUT_SCHEMA.clone(), columns)
.context(error::CreateRecordBatchSnafu)?;
Ok(Output::new_with_record_batches(records))
}
pub async fn show_views(
stmt: ShowViews,
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let schema_name = if let Some(database) = stmt.database {
database
} else {
query_ctx.current_schema()
};
let projects = vec![(tables::TABLE_NAME, VIEWS_COLUMN)];
let filters = vec![
col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
];
let like_field = Some(tables::TABLE_NAME);
let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
query_from_information_schema_table(
query_engine,
catalog_manager,
query_ctx,
VIEWS,
vec![],
projects,
filters,
like_field,
sort,
stmt.kind,
)
.await
}
pub async fn show_flows(
stmt: ShowFlows,
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
let like_field = Some(flows::FLOW_NAME);
let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
query_from_information_schema_table(
query_engine,
catalog_manager,
query_ctx,
FLOWS,
vec![],
projects,
filters,
like_field,
sort,
stmt.kind,
)
.await
}
pub fn show_create_flow(
flow_name: ObjectName,
flow_val: FlowInfoValue,
query_ctx: QueryContextRef,
) -> Result<Output> {
let mut parser_ctx =
ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
let query = parser_ctx.parser_query().context(error::SqlSnafu)?;
let comment = if flow_val.comment().is_empty() {
None
} else {
Some(flow_val.comment().clone())
};
let stmt = CreateFlow {
flow_name,
sink_table_name: ObjectName(vec![Ident {
value: flow_val.sink_table_name().table_name.clone(),
quote_style: None,
}]),
or_replace: true,
if_not_exists: true,
expire_after: flow_val.expire_after(),
comment,
query,
};
let sql = format!("{}", stmt);
let columns = vec![
Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
Arc::new(StringVector::from(vec![sql])) as _,
];
let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
.context(error::CreateRecordBatchSnafu)?;
Ok(Output::new_with_record_batches(records))
}
pub fn describe_table(table: TableRef) -> Result<Output> {
let table_info = table.table_info();
let columns_schemas = table_info.meta.schema.column_schemas();
let columns = vec![
describe_column_names(columns_schemas),
describe_column_types(columns_schemas),
describe_column_keys(columns_schemas, &table_info.meta.primary_key_indices),
describe_column_nullables(columns_schemas),
describe_column_defaults(columns_schemas),
describe_column_semantic_types(columns_schemas, &table_info.meta.primary_key_indices),
];
let records = RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), columns)
.context(error::CreateRecordBatchSnafu)?;
Ok(Output::new_with_record_batches(records))
}
fn describe_column_names(columns_schemas: &[ColumnSchema]) -> VectorRef {
Arc::new(StringVector::from_iterator(
columns_schemas.iter().map(|cs| cs.name.as_str()),
))
}
fn describe_column_types(columns_schemas: &[ColumnSchema]) -> VectorRef {
Arc::new(StringVector::from(
columns_schemas
.iter()
.map(|cs| cs.data_type.name())
.collect::<Vec<_>>(),
))
}
fn describe_column_keys(
columns_schemas: &[ColumnSchema],
primary_key_indices: &[usize],
) -> VectorRef {
Arc::new(StringVector::from_iterator(
columns_schemas.iter().enumerate().map(|(i, cs)| {
if cs.is_time_index() || primary_key_indices.contains(&i) {
PRI_KEY
} else {
""
}
}),
))
}
fn describe_column_nullables(columns_schemas: &[ColumnSchema]) -> VectorRef {
Arc::new(StringVector::from_iterator(columns_schemas.iter().map(
|cs| {
if cs.is_nullable() {
YES_STR
} else {
NO_STR
}
},
)))
}
fn describe_column_defaults(columns_schemas: &[ColumnSchema]) -> VectorRef {
Arc::new(StringVector::from(
columns_schemas
.iter()
.map(|cs| {
cs.default_constraint()
.map_or(String::from(""), |dc| dc.to_string())
})
.collect::<Vec<String>>(),
))
}
fn describe_column_semantic_types(
columns_schemas: &[ColumnSchema],
primary_key_indices: &[usize],
) -> VectorRef {
Arc::new(StringVector::from_iterator(
columns_schemas.iter().enumerate().map(|(i, cs)| {
if primary_key_indices.contains(&i) {
SEMANTIC_TYPE_PRIMARY_KEY
} else if cs.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else {
SEMANTIC_TYPE_FIELD
}
}),
))
}
pub async fn prepare_file_table_files(
options: &HashMap<String, String>,
) -> Result<(ObjectStore, Vec<String>)> {
let url = options
.get(FILE_TABLE_LOCATION_KEY)
.context(error::MissingRequiredFieldSnafu {
name: FILE_TABLE_LOCATION_KEY,
})?;
let (dir, filename) = find_dir_and_filename(url);
let source = if let Some(filename) = filename {
Source::Filename(filename)
} else {
Source::Dir
};
let regex = options
.get(FILE_TABLE_PATTERN_KEY)
.map(|x| Regex::new(x))
.transpose()
.context(error::BuildRegexSnafu)?;
let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
let lister = Lister::new(object_store.clone(), source, dir, regex);
let files = lister
.list()
.await
.context(error::ListObjectsSnafu)?
.into_iter()
.filter_map(|entry| {
if entry.path().ends_with('/') {
None
} else {
Some(entry.path().to_string())
}
})
.collect::<Vec<_>>();
Ok((object_store, files))
}
pub async fn infer_file_table_schema(
object_store: &ObjectStore,
files: &[String],
options: &HashMap<String, String>,
) -> Result<RawSchema> {
let format = parse_file_table_format(options)?;
let merged = infer_schemas(object_store, files, format.as_ref())
.await
.context(error::InferSchemaSnafu)?;
Ok(RawSchema::from(
&Schema::try_from(merged).context(error::ConvertSchemaSnafu)?,
))
}
pub fn file_column_schemas_to_table(
file_column_schemas: &[ColumnSchema],
) -> (Vec<ColumnSchema>, String) {
let mut column_schemas = file_column_schemas.to_owned();
if let Some(time_index_column) = column_schemas.iter().find(|c| c.is_time_index()) {
let time_index = time_index_column.name.clone();
return (column_schemas, time_index);
}
let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
let timestamp_column_schema = ColumnSchema::new(GREPTIME_TIMESTAMP, timestamp_type, false)
.with_time_index(true)
.with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
.unwrap();
if let Some(column_schema) = column_schemas
.iter_mut()
.find(|column_schema| column_schema.name == GREPTIME_TIMESTAMP)
{
*column_schema = timestamp_column_schema;
} else {
column_schemas.push(timestamp_column_schema);
}
(column_schemas, GREPTIME_TIMESTAMP.to_string())
}
pub fn check_file_to_table_schema_compatibility(
file_column_schemas: &[ColumnSchema],
table_column_schemas: &[ColumnSchema],
) -> Result<()> {
let file_schemas_map = file_column_schemas
.iter()
.map(|s| (s.name.clone(), s))
.collect::<HashMap<_, _>>();
for table_column in table_column_schemas {
if let Some(file_column) = file_schemas_map.get(&table_column.name) {
ensure!(
file_column
.data_type
.can_arrow_type_cast_to(&table_column.data_type),
error::ColumnSchemaIncompatibleSnafu {
column: table_column.name.clone(),
file_type: file_column.data_type.clone(),
table_type: table_column.data_type.clone(),
}
);
} else {
ensure!(
table_column.is_nullable() || table_column.default_constraint().is_some(),
error::ColumnSchemaNoDefaultSnafu {
column: table_column.name.clone(),
}
);
}
}
Ok(())
}
fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn FileFormat>> {
Ok(
match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
Format::Csv(format) => Box::new(format),
Format::Json(format) => Box::new(format),
Format::Parquet(format) => Box::new(format),
Format::Orc(format) => Box::new(format),
},
)
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use common_query::{Output, OutputData};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::timestamp::TimeUnit;
use common_time::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
use session::context::QueryContextBuilder;
use snafu::ResultExt;
use sql::ast::{Ident, ObjectName};
use sql::statements::show::ShowVariables;
use table::test_util::MemTable;
use table::TableRef;
use super::show_variable;
use crate::error;
use crate::error::Result;
use crate::sql::{
describe_table, DESCRIBE_TABLE_OUTPUT_SCHEMA, NO_STR, SEMANTIC_TYPE_FIELD,
SEMANTIC_TYPE_TIME_INDEX, YES_STR,
};
#[test]
fn test_describe_table_multiple_columns() -> Result<()> {
let table_name = "test_table";
let schema = vec![
ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(
"t2",
ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
false,
)
.with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
"current_timestamp()",
))))
.unwrap()
.with_time_index(true),
];
let data = vec![
Arc::new(UInt32Vector::from_slice([0])) as _,
Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
];
let expected_columns = vec![
Arc::new(StringVector::from(vec!["t1", "t2"])) as _,
Arc::new(StringVector::from(vec!["UInt32", "TimestampMillisecond"])) as _,
Arc::new(StringVector::from(vec!["", "PRI"])) as _,
Arc::new(StringVector::from(vec![YES_STR, NO_STR])) as _,
Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
Arc::new(StringVector::from(vec![
SEMANTIC_TYPE_FIELD,
SEMANTIC_TYPE_TIME_INDEX,
])) as _,
];
describe_table_test_by_schema(table_name, schema, data, expected_columns)
}
fn describe_table_test_by_schema(
table_name: &str,
schema: Vec<ColumnSchema>,
data: Vec<VectorRef>,
expected_columns: Vec<VectorRef>,
) -> Result<()> {
let table_schema = SchemaRef::new(Schema::new(schema));
let table = prepare_describe_table(table_name, table_schema, data);
let expected =
RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
.context(error::CreateRecordBatchSnafu)?;
if let OutputData::RecordBatches(res) = describe_table(table)?.data {
assert_eq!(res.take(), expected.take());
} else {
panic!("describe table must return record batch");
}
Ok(())
}
fn prepare_describe_table(
table_name: &str,
table_schema: SchemaRef,
data: Vec<VectorRef>,
) -> TableRef {
let record_batch = RecordBatch::new(table_schema, data).unwrap();
MemTable::table(table_name, record_batch)
}
#[test]
fn test_show_variable() {
assert_eq!(
exec_show_variable("SYSTEM_TIME_ZONE", "Asia/Shanghai").unwrap(),
"UTC"
);
assert_eq!(
exec_show_variable("SYSTEM_TIMEZONE", "Asia/Shanghai").unwrap(),
"UTC"
);
assert_eq!(
exec_show_variable("TIME_ZONE", "Asia/Shanghai").unwrap(),
"Asia/Shanghai"
);
assert_eq!(
exec_show_variable("TIMEZONE", "Asia/Shanghai").unwrap(),
"Asia/Shanghai"
);
assert!(exec_show_variable("TIME ZONE", "Asia/Shanghai").is_err());
assert!(exec_show_variable("SYSTEM TIME ZONE", "Asia/Shanghai").is_err());
}
fn exec_show_variable(variable: &str, tz: &str) -> Result<String> {
let stmt = ShowVariables {
variable: ObjectName(vec![Ident::new(variable)]),
};
let ctx = Arc::new(
QueryContextBuilder::default()
.timezone(Timezone::from_tz_string(tz).unwrap())
.build(),
);
match show_variable(stmt, ctx) {
Ok(Output {
data: OutputData::RecordBatches(record),
..
}) => {
let record = record.take().first().cloned().unwrap();
let data = record.column(0);
Ok(data.get(0).to_string())
}
Ok(_) => unreachable!(),
Err(e) => Err(e),
}
}
}