1mod show_create_table;
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use catalog::information_schema::{
21 columns, flows, key_column_usage, process_list, region_peers, schemata, tables, CHARACTER_SETS,
22 COLLATIONS, COLUMNS, FLOWS, KEY_COLUMN_USAGE, REGION_PEERS, SCHEMATA, TABLES, VIEWS,
23};
24use catalog::CatalogManagerRef;
25use common_catalog::consts::{
26 INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
27 SEMANTIC_TYPE_TIME_INDEX,
28};
29use common_catalog::format_full_table_name;
30use common_datasource::file_format::{infer_schemas, FileFormat, Format};
31use common_datasource::lister::{Lister, Source};
32use common_datasource::object_store::build_backend;
33use common_datasource::util::find_dir_and_filename;
34use common_meta::key::flow::flow_info::FlowInfoValue;
35use common_meta::SchemaOptions;
36use common_query::prelude::GREPTIME_TIMESTAMP;
37use common_query::Output;
38use common_recordbatch::adapter::RecordBatchStreamAdapter;
39use common_recordbatch::RecordBatches;
40use common_time::timezone::get_timezone;
41use common_time::Timestamp;
42use datafusion::common::ScalarValue;
43use datafusion::prelude::SessionContext;
44use datafusion_expr::{case, col, lit, Expr, SortExpr};
45use datatypes::prelude::*;
46use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
47use datatypes::vectors::StringVector;
48use itertools::Itertools;
49use object_store::ObjectStore;
50use once_cell::sync::Lazy;
51use regex::Regex;
52use session::context::{Channel, QueryContextRef};
53pub use show_create_table::create_table_stmt;
54use snafu::{ensure, OptionExt, ResultExt};
55use sql::ast::Ident;
56use sql::parser::ParserContext;
57use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions, SqlOrTql};
58use sql::statements::show::{
59 ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowProcessList, ShowRegion,
60 ShowTableStatus, ShowTables, ShowVariables, ShowViews,
61};
62use sql::statements::statement::Statement;
63use sql::statements::OptionMap;
64use sqlparser::ast::ObjectName;
65use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
66use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
67use table::TableRef;
68
69use crate::dataframe::DataFrame;
70use crate::error::{self, Result, UnsupportedVariableSnafu};
71use crate::planner::DfLogicalPlanner;
72use crate::QueryEngineRef;
73
74const SCHEMAS_COLUMN: &str = "Database";
75const OPTIONS_COLUMN: &str = "Options";
76const TABLES_COLUMN: &str = "Tables";
77const VIEWS_COLUMN: &str = "Views";
78const FLOWS_COLUMN: &str = "Flows";
79const FIELD_COLUMN: &str = "Field";
80const TABLE_TYPE_COLUMN: &str = "Table_type";
81const COLUMN_NAME_COLUMN: &str = "Column";
82const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
83const COLUMN_TYPE_COLUMN: &str = "Type";
84const COLUMN_KEY_COLUMN: &str = "Key";
85const COLUMN_EXTRA_COLUMN: &str = "Extra";
86const COLUMN_PRIVILEGES_COLUMN: &str = "Privileges";
87const COLUMN_COLLATION_COLUMN: &str = "Collation";
88const COLUMN_NULLABLE_COLUMN: &str = "Null";
89const COLUMN_DEFAULT_COLUMN: &str = "Default";
90const COLUMN_COMMENT_COLUMN: &str = "Comment";
91const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
92
93const YES_STR: &str = "YES";
94const NO_STR: &str = "NO";
95const PRI_KEY: &str = "PRI";
96const TIME_INDEX: &str = "TIME INDEX";
97
98const INDEX_TABLE_COLUMN: &str = "Table";
100const INDEX_NONT_UNIQUE_COLUMN: &str = "Non_unique";
101const INDEX_CARDINALITY_COLUMN: &str = "Cardinality";
102const INDEX_SUB_PART_COLUMN: &str = "Sub_part";
103const INDEX_PACKED_COLUMN: &str = "Packed";
104const INDEX_INDEX_TYPE_COLUMN: &str = "Index_type";
105const INDEX_COMMENT_COLUMN: &str = "Index_comment";
106const INDEX_VISIBLE_COLUMN: &str = "Visible";
107const INDEX_EXPRESSION_COLUMN: &str = "Expression";
108const INDEX_KEY_NAME_COLUMN: &str = "Key_name";
109const INDEX_SEQ_IN_INDEX_COLUMN: &str = "Seq_in_index";
110const INDEX_COLUMN_NAME_COLUMN: &str = "Column_name";
111
112static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
113 Arc::new(Schema::new(vec![
114 ColumnSchema::new(
115 COLUMN_NAME_COLUMN,
116 ConcreteDataType::string_datatype(),
117 false,
118 ),
119 ColumnSchema::new(
120 COLUMN_TYPE_COLUMN,
121 ConcreteDataType::string_datatype(),
122 false,
123 ),
124 ColumnSchema::new(COLUMN_KEY_COLUMN, ConcreteDataType::string_datatype(), true),
125 ColumnSchema::new(
126 COLUMN_NULLABLE_COLUMN,
127 ConcreteDataType::string_datatype(),
128 false,
129 ),
130 ColumnSchema::new(
131 COLUMN_DEFAULT_COLUMN,
132 ConcreteDataType::string_datatype(),
133 false,
134 ),
135 ColumnSchema::new(
136 COLUMN_SEMANTIC_TYPE_COLUMN,
137 ConcreteDataType::string_datatype(),
138 false,
139 ),
140 ]))
141});
142
143static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
144 Arc::new(Schema::new(vec![
145 ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
146 ColumnSchema::new(
147 "Create Database",
148 ConcreteDataType::string_datatype(),
149 false,
150 ),
151 ]))
152});
153
154static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
155 Arc::new(Schema::new(vec![
156 ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
157 ColumnSchema::new("Create Table", ConcreteDataType::string_datatype(), false),
158 ]))
159});
160
161static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
162 Arc::new(Schema::new(vec![
163 ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
164 ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
165 ]))
166});
167
168static SHOW_CREATE_VIEW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
169 Arc::new(Schema::new(vec![
170 ColumnSchema::new("View", ConcreteDataType::string_datatype(), false),
171 ColumnSchema::new("Create View", ConcreteDataType::string_datatype(), false),
172 ]))
173});
174
175fn null() -> Expr {
176 lit(ScalarValue::Null)
177}
178
179pub async fn show_databases(
180 stmt: ShowDatabases,
181 query_engine: &QueryEngineRef,
182 catalog_manager: &CatalogManagerRef,
183 query_ctx: QueryContextRef,
184) -> Result<Output> {
185 let projects = if stmt.full {
186 vec![
187 (schemata::SCHEMA_NAME, SCHEMAS_COLUMN),
188 (schemata::SCHEMA_OPTS, OPTIONS_COLUMN),
189 ]
190 } else {
191 vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)]
192 };
193
194 let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
195 let like_field = Some(schemata::SCHEMA_NAME);
196 let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
197
198 query_from_information_schema_table(
199 query_engine,
200 catalog_manager,
201 query_ctx,
202 SCHEMATA,
203 vec![],
204 projects,
205 filters,
206 like_field,
207 sort,
208 stmt.kind,
209 )
210 .await
211}
212
213#[allow(clippy::too_many_arguments)]
221async fn query_from_information_schema_table(
222 query_engine: &QueryEngineRef,
223 catalog_manager: &CatalogManagerRef,
224 query_ctx: QueryContextRef,
225 table_name: &str,
226 select: Vec<Expr>,
227 projects: Vec<(&str, &str)>,
228 filters: Vec<Expr>,
229 like_field: Option<&str>,
230 sort: Vec<SortExpr>,
231 kind: ShowKind,
232) -> Result<Output> {
233 let table = catalog_manager
234 .table(
235 query_ctx.current_catalog(),
236 INFORMATION_SCHEMA_NAME,
237 table_name,
238 Some(&query_ctx),
239 )
240 .await
241 .context(error::CatalogSnafu)?
242 .with_context(|| error::TableNotFoundSnafu {
243 table: format_full_table_name(
244 query_ctx.current_catalog(),
245 INFORMATION_SCHEMA_NAME,
246 table_name,
247 ),
248 })?;
249
250 let DataFrame::DataFusion(dataframe) = query_engine.read_table(table)?;
251
252 let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
254 df.filter(expr).context(error::PlanSqlSnafu)
255 })?;
256
257 let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
259 dataframe
260 .filter(col(field).like(lit(ident.value.clone())))
261 .context(error::PlanSqlSnafu)?
262 } else {
263 dataframe
264 };
265
266 let dataframe = if sort.is_empty() {
268 dataframe
269 } else {
270 dataframe.sort(sort).context(error::PlanSqlSnafu)?
271 };
272
273 let dataframe = if select.is_empty() {
275 if projects.is_empty() {
276 dataframe
277 } else {
278 let projection = projects
279 .iter()
280 .map(|x| col(x.0).alias(x.1))
281 .collect::<Vec<_>>();
282 dataframe.select(projection).context(error::PlanSqlSnafu)?
283 }
284 } else {
285 dataframe.select(select).context(error::PlanSqlSnafu)?
286 };
287
288 let dataframe = projects
290 .into_iter()
291 .try_fold(dataframe, |df, (column, renamed_column)| {
292 df.with_column_renamed(column, renamed_column)
293 .context(error::PlanSqlSnafu)
294 })?;
295
296 let dataframe = match kind {
297 ShowKind::All | ShowKind::Like(_) => {
298 dataframe
300 }
301 ShowKind::Where(filter) => {
302 let view = dataframe.into_view();
305 let dataframe = SessionContext::new_with_state(
306 query_engine
307 .engine_context(query_ctx.clone())
308 .state()
309 .clone(),
310 )
311 .read_table(view)?;
312
313 let planner = query_engine.planner();
314 let planner = planner
315 .as_any()
316 .downcast_ref::<DfLogicalPlanner>()
317 .expect("Must be the datafusion planner");
318
319 let filter = planner
320 .sql_to_expr(filter, dataframe.schema(), false, query_ctx)
321 .await?;
322
323 dataframe.filter(filter).context(error::PlanSqlSnafu)?
325 }
326 };
327
328 let stream = dataframe.execute_stream().await?;
329
330 Ok(Output::new_with_stream(Box::pin(
331 RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
332 )))
333}
334
335pub async fn show_columns(
337 stmt: ShowColumns,
338 query_engine: &QueryEngineRef,
339 catalog_manager: &CatalogManagerRef,
340 query_ctx: QueryContextRef,
341) -> Result<Output> {
342 let schema_name = if let Some(database) = stmt.database {
343 database
344 } else {
345 query_ctx.current_schema()
346 };
347
348 let projects = if stmt.full {
349 vec![
350 (columns::COLUMN_NAME, FIELD_COLUMN),
351 (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
352 (columns::COLLATION_NAME, COLUMN_COLLATION_COLUMN),
353 (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
354 (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
355 (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
356 (columns::COLUMN_COMMENT, COLUMN_COMMENT_COLUMN),
357 (columns::PRIVILEGES, COLUMN_PRIVILEGES_COLUMN),
358 (columns::EXTRA, COLUMN_EXTRA_COLUMN),
359 (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
360 ]
361 } else {
362 vec![
363 (columns::COLUMN_NAME, FIELD_COLUMN),
364 (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
365 (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
366 (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
367 (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
368 (columns::EXTRA, COLUMN_EXTRA_COLUMN),
369 (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
370 ]
371 };
372
373 let filters = vec![
374 col(columns::TABLE_NAME).eq(lit(&stmt.table)),
375 col(columns::TABLE_SCHEMA).eq(lit(schema_name.clone())),
376 col(columns::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
377 ];
378 let like_field = Some(columns::COLUMN_NAME);
379 let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
380
381 query_from_information_schema_table(
382 query_engine,
383 catalog_manager,
384 query_ctx,
385 COLUMNS,
386 vec![],
387 projects,
388 filters,
389 like_field,
390 sort,
391 stmt.kind,
392 )
393 .await
394}
395
396pub async fn show_index(
398 stmt: ShowIndex,
399 query_engine: &QueryEngineRef,
400 catalog_manager: &CatalogManagerRef,
401 query_ctx: QueryContextRef,
402) -> Result<Output> {
403 let schema_name = if let Some(database) = stmt.database {
404 database
405 } else {
406 query_ctx.current_schema()
407 };
408
409 let select = vec![
410 col(key_column_usage::TABLE_NAME).alias(INDEX_TABLE_COLUMN),
411 lit(1).alias(INDEX_NONT_UNIQUE_COLUMN),
413 col(key_column_usage::CONSTRAINT_NAME).alias(INDEX_KEY_NAME_COLUMN),
414 col(key_column_usage::ORDINAL_POSITION).alias(INDEX_SEQ_IN_INDEX_COLUMN),
415 col(key_column_usage::COLUMN_NAME).alias(INDEX_COLUMN_NAME_COLUMN),
416 lit("A").alias(COLUMN_COLLATION_COLUMN),
418 null().alias(INDEX_CARDINALITY_COLUMN),
419 null().alias(INDEX_SUB_PART_COLUMN),
420 null().alias(INDEX_PACKED_COLUMN),
421 case(col(key_column_usage::CONSTRAINT_NAME))
426 .when(lit(TIME_INDEX), lit(NO_STR))
427 .otherwise(lit(YES_STR))
428 .context(error::PlanSqlSnafu)?
429 .alias(COLUMN_NULLABLE_COLUMN),
430 col(key_column_usage::GREPTIME_INDEX_TYPE).alias(INDEX_INDEX_TYPE_COLUMN),
431 lit("").alias(COLUMN_COMMENT_COLUMN),
432 lit("").alias(INDEX_COMMENT_COLUMN),
433 lit(YES_STR).alias(INDEX_VISIBLE_COLUMN),
434 null().alias(INDEX_EXPRESSION_COLUMN),
435 ];
436
437 let projects = vec![
438 (key_column_usage::TABLE_NAME, INDEX_TABLE_COLUMN),
439 (INDEX_NONT_UNIQUE_COLUMN, INDEX_NONT_UNIQUE_COLUMN),
440 (key_column_usage::CONSTRAINT_NAME, INDEX_KEY_NAME_COLUMN),
441 (
442 key_column_usage::ORDINAL_POSITION,
443 INDEX_SEQ_IN_INDEX_COLUMN,
444 ),
445 (key_column_usage::COLUMN_NAME, INDEX_COLUMN_NAME_COLUMN),
446 (COLUMN_COLLATION_COLUMN, COLUMN_COLLATION_COLUMN),
447 (INDEX_CARDINALITY_COLUMN, INDEX_CARDINALITY_COLUMN),
448 (INDEX_SUB_PART_COLUMN, INDEX_SUB_PART_COLUMN),
449 (INDEX_PACKED_COLUMN, INDEX_PACKED_COLUMN),
450 (COLUMN_NULLABLE_COLUMN, COLUMN_NULLABLE_COLUMN),
451 (
452 key_column_usage::GREPTIME_INDEX_TYPE,
453 INDEX_INDEX_TYPE_COLUMN,
454 ),
455 (COLUMN_COMMENT_COLUMN, COLUMN_COMMENT_COLUMN),
456 (INDEX_COMMENT_COLUMN, INDEX_COMMENT_COLUMN),
457 (INDEX_VISIBLE_COLUMN, INDEX_VISIBLE_COLUMN),
458 (INDEX_EXPRESSION_COLUMN, INDEX_EXPRESSION_COLUMN),
459 ];
460
461 let filters = vec![
462 col(key_column_usage::TABLE_NAME).eq(lit(&stmt.table)),
463 col(key_column_usage::TABLE_SCHEMA).eq(lit(schema_name.clone())),
464 col(key_column_usage::REAL_TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
465 ];
466 let like_field = None;
467 let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
468
469 query_from_information_schema_table(
470 query_engine,
471 catalog_manager,
472 query_ctx,
473 KEY_COLUMN_USAGE,
474 select,
475 projects,
476 filters,
477 like_field,
478 sort,
479 stmt.kind,
480 )
481 .await
482}
483
484pub async fn show_region(
486 stmt: ShowRegion,
487 query_engine: &QueryEngineRef,
488 catalog_manager: &CatalogManagerRef,
489 query_ctx: QueryContextRef,
490) -> Result<Output> {
491 let schema_name = if let Some(database) = stmt.database {
492 database
493 } else {
494 query_ctx.current_schema()
495 };
496
497 let filters = vec![
498 col(region_peers::TABLE_NAME).eq(lit(&stmt.table)),
499 col(region_peers::TABLE_SCHEMA).eq(lit(schema_name.clone())),
500 col(region_peers::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
501 ];
502 let projects = vec![
503 (region_peers::TABLE_NAME, "Table"),
504 (region_peers::REGION_ID, "Region"),
505 (region_peers::PEER_ID, "Peer"),
506 (region_peers::IS_LEADER, "Leader"),
507 ];
508
509 let like_field = None;
510 let sort = vec![
511 col(columns::REGION_ID).sort(true, true),
512 col(columns::PEER_ID).sort(true, true),
513 ];
514
515 query_from_information_schema_table(
516 query_engine,
517 catalog_manager,
518 query_ctx,
519 REGION_PEERS,
520 vec![],
521 projects,
522 filters,
523 like_field,
524 sort,
525 stmt.kind,
526 )
527 .await
528}
529
530pub async fn show_tables(
532 stmt: ShowTables,
533 query_engine: &QueryEngineRef,
534 catalog_manager: &CatalogManagerRef,
535 query_ctx: QueryContextRef,
536) -> Result<Output> {
537 let schema_name = if let Some(database) = stmt.database {
538 database
539 } else {
540 query_ctx.current_schema()
541 };
542
543 let projects = if stmt.full {
546 vec![
547 (tables::TABLE_NAME, TABLES_COLUMN),
548 (tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
549 ]
550 } else {
551 vec![(tables::TABLE_NAME, TABLES_COLUMN)]
552 };
553 let filters = vec![
554 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
555 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
556 ];
557 let like_field = Some(tables::TABLE_NAME);
558 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
559
560 query_from_information_schema_table(
561 query_engine,
562 catalog_manager,
563 query_ctx,
564 TABLES,
565 vec![],
566 projects,
567 filters,
568 like_field,
569 sort,
570 stmt.kind,
571 )
572 .await
573}
574
575pub async fn show_table_status(
577 stmt: ShowTableStatus,
578 query_engine: &QueryEngineRef,
579 catalog_manager: &CatalogManagerRef,
580 query_ctx: QueryContextRef,
581) -> Result<Output> {
582 let schema_name = if let Some(database) = stmt.database {
583 database
584 } else {
585 query_ctx.current_schema()
586 };
587
588 let projects = vec![
590 (tables::TABLE_NAME, "Name"),
591 (tables::ENGINE, "Engine"),
592 (tables::VERSION, "Version"),
593 (tables::ROW_FORMAT, "Row_format"),
594 (tables::TABLE_ROWS, "Rows"),
595 (tables::AVG_ROW_LENGTH, "Avg_row_length"),
596 (tables::DATA_LENGTH, "Data_length"),
597 (tables::MAX_DATA_LENGTH, "Max_data_length"),
598 (tables::INDEX_LENGTH, "Index_length"),
599 (tables::DATA_FREE, "Data_free"),
600 (tables::AUTO_INCREMENT, "Auto_increment"),
601 (tables::CREATE_TIME, "Create_time"),
602 (tables::UPDATE_TIME, "Update_time"),
603 (tables::CHECK_TIME, "Check_time"),
604 (tables::TABLE_COLLATION, "Collation"),
605 (tables::CHECKSUM, "Checksum"),
606 (tables::CREATE_OPTIONS, "Create_options"),
607 (tables::TABLE_COMMENT, "Comment"),
608 ];
609
610 let filters = vec![
611 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
612 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
613 ];
614 let like_field = Some(tables::TABLE_NAME);
615 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
616
617 query_from_information_schema_table(
618 query_engine,
619 catalog_manager,
620 query_ctx,
621 TABLES,
622 vec![],
623 projects,
624 filters,
625 like_field,
626 sort,
627 stmt.kind,
628 )
629 .await
630}
631
632pub async fn show_collations(
634 kind: ShowKind,
635 query_engine: &QueryEngineRef,
636 catalog_manager: &CatalogManagerRef,
637 query_ctx: QueryContextRef,
638) -> Result<Output> {
639 let projects = vec![
641 ("collation_name", "Collation"),
642 ("character_set_name", "Charset"),
643 ("id", "Id"),
644 ("is_default", "Default"),
645 ("is_compiled", "Compiled"),
646 ("sortlen", "Sortlen"),
647 ];
648
649 let filters = vec![];
650 let like_field = Some("collation_name");
651 let sort = vec![];
652
653 query_from_information_schema_table(
654 query_engine,
655 catalog_manager,
656 query_ctx,
657 COLLATIONS,
658 vec![],
659 projects,
660 filters,
661 like_field,
662 sort,
663 kind,
664 )
665 .await
666}
667
668pub async fn show_charsets(
670 kind: ShowKind,
671 query_engine: &QueryEngineRef,
672 catalog_manager: &CatalogManagerRef,
673 query_ctx: QueryContextRef,
674) -> Result<Output> {
675 let projects = vec![
677 ("character_set_name", "Charset"),
678 ("description", "Description"),
679 ("default_collate_name", "Default collation"),
680 ("maxlen", "Maxlen"),
681 ];
682
683 let filters = vec![];
684 let like_field = Some("character_set_name");
685 let sort = vec![];
686
687 query_from_information_schema_table(
688 query_engine,
689 catalog_manager,
690 query_ctx,
691 CHARACTER_SETS,
692 vec![],
693 projects,
694 filters,
695 like_field,
696 sort,
697 kind,
698 )
699 .await
700}
701
702pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
703 let variable = stmt.variable.to_string().to_uppercase();
704 let value = match variable.as_str() {
705 "SYSTEM_TIME_ZONE" | "SYSTEM_TIMEZONE" => get_timezone(None).to_string(),
706 "TIME_ZONE" | "TIMEZONE" => query_ctx.timezone().to_string(),
707 "READ_PREFERENCE" => query_ctx.read_preference().to_string(),
708 "DATESTYLE" => {
709 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
710 format!("{}, {}", style, order)
711 }
712 "MAX_EXECUTION_TIME" => {
713 if query_ctx.channel() == Channel::Mysql {
714 query_ctx.query_timeout_as_millis().to_string()
715 } else {
716 return UnsupportedVariableSnafu { name: variable }.fail();
717 }
718 }
719 "STATEMENT_TIMEOUT" => {
720 if query_ctx.channel() == Channel::Postgres {
722 let mut timeout = query_ctx.query_timeout_as_millis().to_string();
723 timeout.push_str("ms");
724 timeout
725 } else {
726 return UnsupportedVariableSnafu { name: variable }.fail();
727 }
728 }
729 _ => return UnsupportedVariableSnafu { name: variable }.fail(),
730 };
731 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
732 variable,
733 ConcreteDataType::string_datatype(),
734 false,
735 )]));
736 let records = RecordBatches::try_from_columns(
737 schema,
738 vec![Arc::new(StringVector::from(vec![value])) as _],
739 )
740 .context(error::CreateRecordBatchSnafu)?;
741 Ok(Output::new_with_record_batches(records))
742}
743
744pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
745 let schema = Arc::new(Schema::new(vec![
746 ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), false),
747 ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
748 ]));
749 let records = RecordBatches::try_from_columns(
750 schema,
751 vec![
752 Arc::new(StringVector::from(Vec::<&str>::new())) as _,
753 Arc::new(StringVector::from(Vec::<&str>::new())) as _,
754 ],
755 )
756 .context(error::CreateRecordBatchSnafu)?;
757 Ok(Output::new_with_record_batches(records))
758}
759
760pub async fn show_search_path(_query_ctx: QueryContextRef) -> Result<Output> {
761 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
762 "search_path",
763 ConcreteDataType::string_datatype(),
764 false,
765 )]));
766 let records = RecordBatches::try_from_columns(
767 schema,
768 vec![Arc::new(StringVector::from(vec![_query_ctx.current_schema()])) as _],
769 )
770 .context(error::CreateRecordBatchSnafu)?;
771 Ok(Output::new_with_record_batches(records))
772}
773
774pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
775 let stmt = CreateDatabase {
776 name: ObjectName::from(vec![Ident::new(database_name)]),
777 if_not_exists: true,
778 options,
779 };
780 let sql = format!("{stmt}");
781 let columns = vec![
782 Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
783 Arc::new(StringVector::from(vec![sql])) as _,
784 ];
785 let records =
786 RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
787 .context(error::CreateRecordBatchSnafu)?;
788 Ok(Output::new_with_record_batches(records))
789}
790
791pub fn show_create_table(
792 table: TableRef,
793 schema_options: Option<SchemaOptions>,
794 partitions: Option<Partitions>,
795 query_ctx: QueryContextRef,
796) -> Result<Output> {
797 let table_info = table.table_info();
798 let table_name = &table_info.name;
799
800 let quote_style = query_ctx.quote_style();
801
802 let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
803 stmt.partitions = partitions.map(|mut p| {
804 p.set_quote(quote_style);
805 p
806 });
807 let sql = format!("{}", stmt);
808 let columns = vec![
809 Arc::new(StringVector::from(vec![table_name.clone()])) as _,
810 Arc::new(StringVector::from(vec![sql])) as _,
811 ];
812 let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
813 .context(error::CreateRecordBatchSnafu)?;
814
815 Ok(Output::new_with_record_batches(records))
816}
817
818pub fn show_create_foreign_table_for_pg(
819 table: TableRef,
820 _query_ctx: QueryContextRef,
821) -> Result<Output> {
822 let table_info = table.table_info();
823
824 let table_meta = &table_info.meta;
825 let table_name = &table_info.name;
826 let schema = &table_info.meta.schema;
827 let is_metric_engine = is_metric_engine(&table_meta.engine);
828
829 let columns = schema
830 .column_schemas()
831 .iter()
832 .filter_map(|c| {
833 if is_metric_engine && is_metric_engine_internal_column(&c.name) {
834 None
835 } else {
836 Some(format!(
837 "\"{}\" {}",
838 c.name,
839 c.data_type.postgres_datatype_name()
840 ))
841 }
842 })
843 .join(",\n ");
844
845 let sql = format!(
846 r#"CREATE FOREIGN TABLE ft_{} (
847 {}
848)
849SERVER greptimedb
850OPTIONS (table_name '{}')"#,
851 table_name, columns, table_name
852 );
853
854 let columns = vec![
855 Arc::new(StringVector::from(vec![table_name.clone()])) as _,
856 Arc::new(StringVector::from(vec![sql])) as _,
857 ];
858 let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
859 .context(error::CreateRecordBatchSnafu)?;
860
861 Ok(Output::new_with_record_batches(records))
862}
863
864pub fn show_create_view(
865 view_name: ObjectName,
866 definition: &str,
867 query_ctx: QueryContextRef,
868) -> Result<Output> {
869 let mut parser_ctx =
870 ParserContext::new(query_ctx.sql_dialect(), definition).context(error::SqlSnafu)?;
871
872 let Statement::CreateView(create_view) =
873 parser_ctx.parse_statement().context(error::SqlSnafu)?
874 else {
875 unreachable!();
877 };
878
879 let stmt = CreateView {
880 name: view_name.clone(),
881 columns: create_view.columns,
882 query: create_view.query,
883 or_replace: create_view.or_replace,
884 if_not_exists: create_view.if_not_exists,
885 };
886
887 let sql = format!("{}", stmt);
888 let columns = vec![
889 Arc::new(StringVector::from(vec![view_name.to_string()])) as _,
890 Arc::new(StringVector::from(vec![sql])) as _,
891 ];
892 let records = RecordBatches::try_from_columns(SHOW_CREATE_VIEW_OUTPUT_SCHEMA.clone(), columns)
893 .context(error::CreateRecordBatchSnafu)?;
894
895 Ok(Output::new_with_record_batches(records))
896}
897
898pub async fn show_views(
900 stmt: ShowViews,
901 query_engine: &QueryEngineRef,
902 catalog_manager: &CatalogManagerRef,
903 query_ctx: QueryContextRef,
904) -> Result<Output> {
905 let schema_name = if let Some(database) = stmt.database {
906 database
907 } else {
908 query_ctx.current_schema()
909 };
910
911 let projects = vec![(tables::TABLE_NAME, VIEWS_COLUMN)];
912 let filters = vec![
913 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
914 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
915 ];
916 let like_field = Some(tables::TABLE_NAME);
917 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
918
919 query_from_information_schema_table(
920 query_engine,
921 catalog_manager,
922 query_ctx,
923 VIEWS,
924 vec![],
925 projects,
926 filters,
927 like_field,
928 sort,
929 stmt.kind,
930 )
931 .await
932}
933
934pub async fn show_flows(
936 stmt: ShowFlows,
937 query_engine: &QueryEngineRef,
938 catalog_manager: &CatalogManagerRef,
939 query_ctx: QueryContextRef,
940) -> Result<Output> {
941 let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
942 let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
943 let like_field = Some(flows::FLOW_NAME);
944 let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
945
946 query_from_information_schema_table(
947 query_engine,
948 catalog_manager,
949 query_ctx,
950 FLOWS,
951 vec![],
952 projects,
953 filters,
954 like_field,
955 sort,
956 stmt.kind,
957 )
958 .await
959}
960
961#[cfg(feature = "enterprise")]
962pub async fn show_triggers(
963 stmt: sql::statements::show::trigger::ShowTriggers,
964 query_engine: &QueryEngineRef,
965 catalog_manager: &CatalogManagerRef,
966 query_ctx: QueryContextRef,
967) -> Result<Output> {
968 const TRIGGER_NAME: &str = "trigger_name";
969 const TRIGGERS_COLUMN: &str = "Triggers";
970
971 let projects = vec![(TRIGGER_NAME, TRIGGERS_COLUMN)];
972 let like_field = Some(TRIGGER_NAME);
973 let sort = vec![col(TRIGGER_NAME).sort(true, true)];
974
975 query_from_information_schema_table(
976 query_engine,
977 catalog_manager,
978 query_ctx,
979 catalog::information_schema::TRIGGERS,
980 vec![],
981 projects,
982 vec![],
983 like_field,
984 sort,
985 stmt.kind,
986 )
987 .await
988}
989
990pub fn show_create_flow(
991 flow_name: ObjectName,
992 flow_val: FlowInfoValue,
993 query_ctx: QueryContextRef,
994) -> Result<Output> {
995 let mut parser_ctx =
996 ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
997
998 let query = parser_ctx.parse_statement().context(error::SqlSnafu)?;
999
1000 let raw_query = match &query {
1002 Statement::Tql(_) => flow_val.raw_sql().clone(),
1003 _ => query.to_string(),
1004 };
1005
1006 let query = Box::new(SqlOrTql::try_from_statement(query, &raw_query).context(error::SqlSnafu)?);
1007
1008 let comment = if flow_val.comment().is_empty() {
1009 None
1010 } else {
1011 Some(flow_val.comment().clone())
1012 };
1013
1014 let stmt = CreateFlow {
1015 flow_name,
1016 sink_table_name: ObjectName::from(vec![Ident::new(&flow_val.sink_table_name().table_name)]),
1017 or_replace: false,
1020 if_not_exists: true,
1021 expire_after: flow_val.expire_after(),
1022 comment,
1023 query,
1024 };
1025
1026 let sql = format!("{}", stmt);
1027 let columns = vec![
1028 Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
1029 Arc::new(StringVector::from(vec![sql])) as _,
1030 ];
1031 let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
1032 .context(error::CreateRecordBatchSnafu)?;
1033
1034 Ok(Output::new_with_record_batches(records))
1035}
1036
1037pub fn describe_table(table: TableRef) -> Result<Output> {
1038 let table_info = table.table_info();
1039 let columns_schemas = table_info.meta.schema.column_schemas();
1040 let columns = vec![
1041 describe_column_names(columns_schemas),
1042 describe_column_types(columns_schemas),
1043 describe_column_keys(columns_schemas, &table_info.meta.primary_key_indices),
1044 describe_column_nullables(columns_schemas),
1045 describe_column_defaults(columns_schemas),
1046 describe_column_semantic_types(columns_schemas, &table_info.meta.primary_key_indices),
1047 ];
1048 let records = RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), columns)
1049 .context(error::CreateRecordBatchSnafu)?;
1050 Ok(Output::new_with_record_batches(records))
1051}
1052
1053fn describe_column_names(columns_schemas: &[ColumnSchema]) -> VectorRef {
1054 Arc::new(StringVector::from_iterator(
1055 columns_schemas.iter().map(|cs| cs.name.as_str()),
1056 ))
1057}
1058
1059fn describe_column_types(columns_schemas: &[ColumnSchema]) -> VectorRef {
1060 Arc::new(StringVector::from(
1061 columns_schemas
1062 .iter()
1063 .map(|cs| cs.data_type.name())
1064 .collect::<Vec<_>>(),
1065 ))
1066}
1067
1068fn describe_column_keys(
1069 columns_schemas: &[ColumnSchema],
1070 primary_key_indices: &[usize],
1071) -> VectorRef {
1072 Arc::new(StringVector::from_iterator(
1073 columns_schemas.iter().enumerate().map(|(i, cs)| {
1074 if cs.is_time_index() || primary_key_indices.contains(&i) {
1075 PRI_KEY
1076 } else {
1077 ""
1078 }
1079 }),
1080 ))
1081}
1082
1083fn describe_column_nullables(columns_schemas: &[ColumnSchema]) -> VectorRef {
1084 Arc::new(StringVector::from_iterator(columns_schemas.iter().map(
1085 |cs| {
1086 if cs.is_nullable() {
1087 YES_STR
1088 } else {
1089 NO_STR
1090 }
1091 },
1092 )))
1093}
1094
1095fn describe_column_defaults(columns_schemas: &[ColumnSchema]) -> VectorRef {
1096 Arc::new(StringVector::from(
1097 columns_schemas
1098 .iter()
1099 .map(|cs| {
1100 cs.default_constraint()
1101 .map_or(String::from(""), |dc| dc.to_string())
1102 })
1103 .collect::<Vec<String>>(),
1104 ))
1105}
1106
1107fn describe_column_semantic_types(
1108 columns_schemas: &[ColumnSchema],
1109 primary_key_indices: &[usize],
1110) -> VectorRef {
1111 Arc::new(StringVector::from_iterator(
1112 columns_schemas.iter().enumerate().map(|(i, cs)| {
1113 if primary_key_indices.contains(&i) {
1114 SEMANTIC_TYPE_PRIMARY_KEY
1115 } else if cs.is_time_index() {
1116 SEMANTIC_TYPE_TIME_INDEX
1117 } else {
1118 SEMANTIC_TYPE_FIELD
1119 }
1120 }),
1121 ))
1122}
1123
1124pub async fn prepare_file_table_files(
1126 options: &HashMap<String, String>,
1127) -> Result<(ObjectStore, Vec<String>)> {
1128 let url = options
1129 .get(FILE_TABLE_LOCATION_KEY)
1130 .context(error::MissingRequiredFieldSnafu {
1131 name: FILE_TABLE_LOCATION_KEY,
1132 })?;
1133
1134 let (dir, filename) = find_dir_and_filename(url);
1135 let source = if let Some(filename) = filename {
1136 Source::Filename(filename)
1137 } else {
1138 Source::Dir
1139 };
1140 let regex = options
1141 .get(FILE_TABLE_PATTERN_KEY)
1142 .map(|x| Regex::new(x))
1143 .transpose()
1144 .context(error::BuildRegexSnafu)?;
1145 let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
1146 let lister = Lister::new(object_store.clone(), source, dir, regex);
1147 let files = lister
1152 .list()
1153 .await
1154 .context(error::ListObjectsSnafu)?
1155 .into_iter()
1156 .filter_map(|entry| {
1157 if entry.path().ends_with('/') {
1158 None
1159 } else {
1160 Some(entry.path().to_string())
1161 }
1162 })
1163 .collect::<Vec<_>>();
1164 Ok((object_store, files))
1165}
1166
1167pub async fn infer_file_table_schema(
1168 object_store: &ObjectStore,
1169 files: &[String],
1170 options: &HashMap<String, String>,
1171) -> Result<RawSchema> {
1172 let format = parse_file_table_format(options)?;
1173 let merged = infer_schemas(object_store, files, format.as_ref())
1174 .await
1175 .context(error::InferSchemaSnafu)?;
1176 Ok(RawSchema::from(
1177 &Schema::try_from(merged).context(error::ConvertSchemaSnafu)?,
1178 ))
1179}
1180
1181pub fn file_column_schemas_to_table(
1191 file_column_schemas: &[ColumnSchema],
1192) -> (Vec<ColumnSchema>, String) {
1193 let mut column_schemas = file_column_schemas.to_owned();
1194 if let Some(time_index_column) = column_schemas.iter().find(|c| c.is_time_index()) {
1195 let time_index = time_index_column.name.clone();
1196 return (column_schemas, time_index);
1197 }
1198
1199 let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
1200 let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
1201 let timestamp_column_schema = ColumnSchema::new(GREPTIME_TIMESTAMP, timestamp_type, false)
1202 .with_time_index(true)
1203 .with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
1204 .unwrap();
1205
1206 if let Some(column_schema) = column_schemas
1207 .iter_mut()
1208 .find(|column_schema| column_schema.name == GREPTIME_TIMESTAMP)
1209 {
1210 *column_schema = timestamp_column_schema;
1212 } else {
1213 column_schemas.push(timestamp_column_schema);
1214 }
1215
1216 (column_schemas, GREPTIME_TIMESTAMP.to_string())
1217}
1218
1219pub fn check_file_to_table_schema_compatibility(
1228 file_column_schemas: &[ColumnSchema],
1229 table_column_schemas: &[ColumnSchema],
1230) -> Result<()> {
1231 let file_schemas_map = file_column_schemas
1232 .iter()
1233 .map(|s| (s.name.clone(), s))
1234 .collect::<HashMap<_, _>>();
1235
1236 for table_column in table_column_schemas {
1237 if let Some(file_column) = file_schemas_map.get(&table_column.name) {
1238 ensure!(
1240 file_column
1241 .data_type
1242 .can_arrow_type_cast_to(&table_column.data_type),
1243 error::ColumnSchemaIncompatibleSnafu {
1244 column: table_column.name.clone(),
1245 file_type: file_column.data_type.clone(),
1246 table_type: table_column.data_type.clone(),
1247 }
1248 );
1249 } else {
1250 ensure!(
1251 table_column.is_nullable() || table_column.default_constraint().is_some(),
1252 error::ColumnSchemaNoDefaultSnafu {
1253 column: table_column.name.clone(),
1254 }
1255 );
1256 }
1257 }
1258
1259 Ok(())
1260}
1261
1262fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn FileFormat>> {
1263 Ok(
1264 match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
1265 Format::Csv(format) => Box::new(format),
1266 Format::Json(format) => Box::new(format),
1267 Format::Parquet(format) => Box::new(format),
1268 Format::Orc(format) => Box::new(format),
1269 },
1270 )
1271}
1272
1273pub async fn show_processlist(
1274 stmt: ShowProcessList,
1275 query_engine: &QueryEngineRef,
1276 catalog_manager: &CatalogManagerRef,
1277 query_ctx: QueryContextRef,
1278) -> Result<Output> {
1279 let projects = if stmt.full {
1280 vec![
1281 (process_list::ID, "Id"),
1282 (process_list::CATALOG, "Catalog"),
1283 (process_list::SCHEMAS, "Schema"),
1284 (process_list::CLIENT, "Client"),
1285 (process_list::FRONTEND, "Frontend"),
1286 (process_list::START_TIMESTAMP, "Start Time"),
1287 (process_list::ELAPSED_TIME, "Elapsed Time"),
1288 (process_list::QUERY, "Query"),
1289 ]
1290 } else {
1291 vec![
1292 (process_list::ID, "Id"),
1293 (process_list::CATALOG, "Catalog"),
1294 (process_list::QUERY, "Query"),
1295 (process_list::ELAPSED_TIME, "Elapsed Time"),
1296 ]
1297 };
1298
1299 let filters = vec![];
1300 let like_field = None;
1301 let sort = vec![col("id").sort(true, true)];
1302 query_from_information_schema_table(
1303 query_engine,
1304 catalog_manager,
1305 query_ctx.clone(),
1306 "process_list",
1307 vec![],
1308 projects.clone(),
1309 filters,
1310 like_field,
1311 sort,
1312 ShowKind::All,
1313 )
1314 .await
1315}
1316
1317#[cfg(test)]
1318mod test {
1319 use std::sync::Arc;
1320
1321 use common_query::{Output, OutputData};
1322 use common_recordbatch::{RecordBatch, RecordBatches};
1323 use common_time::timestamp::TimeUnit;
1324 use common_time::Timezone;
1325 use datatypes::prelude::ConcreteDataType;
1326 use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
1327 use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
1328 use session::context::QueryContextBuilder;
1329 use snafu::ResultExt;
1330 use sql::ast::{Ident, ObjectName};
1331 use sql::statements::show::ShowVariables;
1332 use table::test_util::MemTable;
1333 use table::TableRef;
1334
1335 use super::show_variable;
1336 use crate::error;
1337 use crate::error::Result;
1338 use crate::sql::{
1339 describe_table, DESCRIBE_TABLE_OUTPUT_SCHEMA, NO_STR, SEMANTIC_TYPE_FIELD,
1340 SEMANTIC_TYPE_TIME_INDEX, YES_STR,
1341 };
1342
1343 #[test]
1344 fn test_describe_table_multiple_columns() -> Result<()> {
1345 let table_name = "test_table";
1346 let schema = vec![
1347 ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
1348 ColumnSchema::new(
1349 "t2",
1350 ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
1351 false,
1352 )
1353 .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
1354 "current_timestamp()",
1355 ))))
1356 .unwrap()
1357 .with_time_index(true),
1358 ];
1359 let data = vec![
1360 Arc::new(UInt32Vector::from_slice([0])) as _,
1361 Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
1362 ];
1363 let expected_columns = vec![
1364 Arc::new(StringVector::from(vec!["t1", "t2"])) as _,
1365 Arc::new(StringVector::from(vec!["UInt32", "TimestampMillisecond"])) as _,
1366 Arc::new(StringVector::from(vec!["", "PRI"])) as _,
1367 Arc::new(StringVector::from(vec![YES_STR, NO_STR])) as _,
1368 Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
1369 Arc::new(StringVector::from(vec![
1370 SEMANTIC_TYPE_FIELD,
1371 SEMANTIC_TYPE_TIME_INDEX,
1372 ])) as _,
1373 ];
1374
1375 describe_table_test_by_schema(table_name, schema, data, expected_columns)
1376 }
1377
1378 fn describe_table_test_by_schema(
1379 table_name: &str,
1380 schema: Vec<ColumnSchema>,
1381 data: Vec<VectorRef>,
1382 expected_columns: Vec<VectorRef>,
1383 ) -> Result<()> {
1384 let table_schema = SchemaRef::new(Schema::new(schema));
1385 let table = prepare_describe_table(table_name, table_schema, data);
1386
1387 let expected =
1388 RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
1389 .context(error::CreateRecordBatchSnafu)?;
1390
1391 if let OutputData::RecordBatches(res) = describe_table(table)?.data {
1392 assert_eq!(res.take(), expected.take());
1393 } else {
1394 panic!("describe table must return record batch");
1395 }
1396
1397 Ok(())
1398 }
1399
1400 fn prepare_describe_table(
1401 table_name: &str,
1402 table_schema: SchemaRef,
1403 data: Vec<VectorRef>,
1404 ) -> TableRef {
1405 let record_batch = RecordBatch::new(table_schema, data).unwrap();
1406 MemTable::table(table_name, record_batch)
1407 }
1408
1409 #[test]
1410 fn test_show_variable() {
1411 assert_eq!(
1412 exec_show_variable("SYSTEM_TIME_ZONE", "Asia/Shanghai").unwrap(),
1413 "UTC"
1414 );
1415 assert_eq!(
1416 exec_show_variable("SYSTEM_TIMEZONE", "Asia/Shanghai").unwrap(),
1417 "UTC"
1418 );
1419 assert_eq!(
1420 exec_show_variable("TIME_ZONE", "Asia/Shanghai").unwrap(),
1421 "Asia/Shanghai"
1422 );
1423 assert_eq!(
1424 exec_show_variable("TIMEZONE", "Asia/Shanghai").unwrap(),
1425 "Asia/Shanghai"
1426 );
1427 assert!(exec_show_variable("TIME ZONE", "Asia/Shanghai").is_err());
1428 assert!(exec_show_variable("SYSTEM TIME ZONE", "Asia/Shanghai").is_err());
1429 }
1430
1431 fn exec_show_variable(variable: &str, tz: &str) -> Result<String> {
1432 let stmt = ShowVariables {
1433 variable: ObjectName::from(vec![Ident::new(variable)]),
1434 };
1435 let ctx = Arc::new(
1436 QueryContextBuilder::default()
1437 .timezone(Timezone::from_tz_string(tz).unwrap())
1438 .build(),
1439 );
1440 match show_variable(stmt, ctx) {
1441 Ok(Output {
1442 data: OutputData::RecordBatches(record),
1443 ..
1444 }) => {
1445 let record = record.take().first().cloned().unwrap();
1446 let data = record.column(0);
1447 Ok(data.get(0).to_string())
1448 }
1449 Ok(_) => unreachable!(),
1450 Err(e) => Err(e),
1451 }
1452 }
1453}