1mod show_create_table;
16
17use std::collections::HashMap;
18use std::ops::ControlFlow;
19use std::sync::Arc;
20
21use catalog::CatalogManagerRef;
22use catalog::information_schema::{
23 CHARACTER_SETS, COLLATIONS, COLUMNS, FLOWS, REGION_PEERS, SCHEMATA, STATISTICS, TABLES, VIEWS,
24 columns, flows, process_list, region_peers, schemata, statistics, tables,
25};
26use common_catalog::consts::{
27 INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
28 SEMANTIC_TYPE_TIME_INDEX,
29};
30use common_catalog::format_full_table_name;
31use common_datasource::file_format::{FileFormat, Format, infer_schemas};
32use common_datasource::lister::{Lister, Source};
33use common_datasource::object_store::build_backend;
34use common_datasource::util::find_dir_and_filename;
35use common_meta::SchemaOptions;
36use common_meta::ddl::create_flow::FlowType;
37use common_meta::key::flow::flow_info::FlowInfoValue;
38use common_query::Output;
39use common_query::prelude::greptime_timestamp;
40use common_recordbatch::RecordBatches;
41use common_recordbatch::adapter::RecordBatchStreamAdapter;
42use common_time::Timestamp;
43use common_time::timezone::get_timezone;
44use datafusion::prelude::SessionContext;
45use datafusion_expr::{Expr, SortExpr, col, lit};
46use datatypes::prelude::*;
47use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema};
48use datatypes::vectors::StringVector;
49use itertools::Itertools;
50use object_store::ObjectStore;
51use once_cell::sync::Lazy;
52use regex::Regex;
53use session::context::{Channel, QueryContextRef};
54pub use show_create_table::create_table_stmt;
55use snafu::{OptionExt, ResultExt, ensure};
56use sql::ast::{Ident, visit_expressions_mut};
57use sql::parser::ParserContext;
58use sql::statements::OptionMap;
59use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions, SqlOrTql};
60use sql::statements::show::{
61 ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowProcessList, ShowRegion,
62 ShowTableStatus, ShowTables, ShowVariables, ShowViews,
63};
64use sql::statements::statement::Statement;
65use sqlparser::ast::ObjectName;
66use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
67use table::TableRef;
68use table::metadata::TableInfoRef;
69use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
70
71use crate::QueryEngineRef;
72use crate::error::{self, Result, UnsupportedVariableSnafu};
73use crate::planner::DfLogicalPlanner;
74
75const SCHEMAS_COLUMN: &str = "Database";
76const OPTIONS_COLUMN: &str = "Options";
77const VIEWS_COLUMN: &str = "Views";
78const FLOWS_COLUMN: &str = "Flows";
79const FIELD_COLUMN: &str = "Field";
80const TABLE_TYPE_COLUMN: &str = "Table_type";
81
82const COLUMN_NAME_COLUMN: &str = "Column";
83const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
84const COLUMN_TYPE_COLUMN: &str = "Type";
85const COLUMN_KEY_COLUMN: &str = "Key";
86const COLUMN_EXTRA_COLUMN: &str = "Extra";
87const COLUMN_PRIVILEGES_COLUMN: &str = "Privileges";
88const COLUMN_COLLATION_COLUMN: &str = "Collation";
89const COLUMN_NULLABLE_COLUMN: &str = "Null";
90const COLUMN_DEFAULT_COLUMN: &str = "Default";
91const COLUMN_COMMENT_COLUMN: &str = "Comment";
92const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
93
94const YES_STR: &str = "YES";
95const NO_STR: &str = "NO";
96const PRI_KEY: &str = "PRI";
97
98const INDEX_TABLE_COLUMN: &str = "Table";
100const INDEX_NONT_UNIQUE_COLUMN: &str = "Non_unique";
101const INDEX_CARDINALITY_COLUMN: &str = "Cardinality";
102const INDEX_SUB_PART_COLUMN: &str = "Sub_part";
103const INDEX_PACKED_COLUMN: &str = "Packed";
104const INDEX_INDEX_TYPE_COLUMN: &str = "Index_type";
105const INDEX_COMMENT_COLUMN: &str = "Index_comment";
106const INDEX_VISIBLE_COLUMN: &str = "Visible";
107const INDEX_EXPRESSION_COLUMN: &str = "Expression";
108const INDEX_KEY_NAME_COLUMN: &str = "Key_name";
109const INDEX_SEQ_IN_INDEX_COLUMN: &str = "Seq_in_index";
110const INDEX_COLUMN_NAME_COLUMN: &str = "Column_name";
111
112static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
113 Arc::new(Schema::new(vec![
114 ColumnSchema::new(
115 COLUMN_NAME_COLUMN,
116 ConcreteDataType::string_datatype(),
117 false,
118 ),
119 ColumnSchema::new(
120 COLUMN_TYPE_COLUMN,
121 ConcreteDataType::string_datatype(),
122 false,
123 ),
124 ColumnSchema::new(COLUMN_KEY_COLUMN, ConcreteDataType::string_datatype(), true),
125 ColumnSchema::new(
126 COLUMN_NULLABLE_COLUMN,
127 ConcreteDataType::string_datatype(),
128 false,
129 ),
130 ColumnSchema::new(
131 COLUMN_DEFAULT_COLUMN,
132 ConcreteDataType::string_datatype(),
133 false,
134 ),
135 ColumnSchema::new(
136 COLUMN_SEMANTIC_TYPE_COLUMN,
137 ConcreteDataType::string_datatype(),
138 false,
139 ),
140 ]))
141});
142
143static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
144 Arc::new(Schema::new(vec![
145 ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
146 ColumnSchema::new(
147 "Create Database",
148 ConcreteDataType::string_datatype(),
149 false,
150 ),
151 ]))
152});
153
154static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
155 Arc::new(Schema::new(vec![
156 ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
157 ColumnSchema::new("Create Table", ConcreteDataType::string_datatype(), false),
158 ]))
159});
160
161static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
162 Arc::new(Schema::new(vec![
163 ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
164 ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
165 ]))
166});
167
168static SHOW_CREATE_VIEW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
169 Arc::new(Schema::new(vec![
170 ColumnSchema::new("View", ConcreteDataType::string_datatype(), false),
171 ColumnSchema::new("Create View", ConcreteDataType::string_datatype(), false),
172 ]))
173});
174
175pub async fn show_databases(
176 stmt: ShowDatabases,
177 query_engine: &QueryEngineRef,
178 catalog_manager: &CatalogManagerRef,
179 query_ctx: QueryContextRef,
180) -> Result<Output> {
181 let projects = if stmt.full {
182 vec![
183 (schemata::SCHEMA_NAME, SCHEMAS_COLUMN),
184 (schemata::SCHEMA_OPTS, OPTIONS_COLUMN),
185 ]
186 } else {
187 vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)]
188 };
189
190 let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
191 let like_field = Some(schemata::SCHEMA_NAME);
192 let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
193
194 query_from_information_schema_table(
195 query_engine,
196 catalog_manager,
197 query_ctx,
198 SCHEMATA,
199 vec![],
200 projects,
201 filters,
202 like_field,
203 sort,
204 stmt.kind,
205 )
206 .await
207}
208
209fn replace_column_in_expr(expr: &mut sqlparser::ast::Expr, from_column: &str, to_column: &str) {
212 let _ = visit_expressions_mut(expr, |e| {
213 match e {
214 sqlparser::ast::Expr::Identifier(ident)
215 if ident.value.eq_ignore_ascii_case(from_column) =>
216 {
217 ident.value = to_column.to_string();
218 }
219 sqlparser::ast::Expr::CompoundIdentifier(idents) => {
220 if let Some(last) = idents.last_mut()
221 && last.value.eq_ignore_ascii_case(from_column)
222 {
223 last.value = to_column.to_string();
224 }
225 }
226 _ => {}
227 }
228 ControlFlow::<()>::Continue(())
229 });
230}
231
232#[allow(clippy::too_many_arguments)]
240async fn query_from_information_schema_table(
241 query_engine: &QueryEngineRef,
242 catalog_manager: &CatalogManagerRef,
243 query_ctx: QueryContextRef,
244 table_name: &str,
245 select: Vec<Expr>,
246 projects: Vec<(&str, &str)>,
247 filters: Vec<Expr>,
248 like_field: Option<&str>,
249 sort: Vec<SortExpr>,
250 kind: ShowKind,
251) -> Result<Output> {
252 let table = catalog_manager
253 .table(
254 query_ctx.current_catalog(),
255 INFORMATION_SCHEMA_NAME,
256 table_name,
257 Some(&query_ctx),
258 )
259 .await
260 .context(error::CatalogSnafu)?
261 .with_context(|| error::TableNotFoundSnafu {
262 table: format_full_table_name(
263 query_ctx.current_catalog(),
264 INFORMATION_SCHEMA_NAME,
265 table_name,
266 ),
267 })?;
268
269 let dataframe = query_engine.read_table(table)?;
270
271 let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
273 df.filter(expr).context(error::PlanSqlSnafu)
274 })?;
275
276 let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
278 dataframe
279 .filter(col(field).like(lit(ident.value.clone())))
280 .context(error::PlanSqlSnafu)?
281 } else {
282 dataframe
283 };
284
285 let dataframe = if sort.is_empty() {
287 dataframe
288 } else {
289 dataframe.sort(sort).context(error::PlanSqlSnafu)?
290 };
291
292 let dataframe = if select.is_empty() {
294 if projects.is_empty() {
295 dataframe
296 } else {
297 let projection = projects
298 .iter()
299 .map(|x| col(x.0).alias(x.1))
300 .collect::<Vec<_>>();
301 dataframe.select(projection).context(error::PlanSqlSnafu)?
302 }
303 } else {
304 dataframe.select(select).context(error::PlanSqlSnafu)?
305 };
306
307 let dataframe = projects
309 .into_iter()
310 .try_fold(dataframe, |df, (column, renamed_column)| {
311 df.with_column_renamed(column, renamed_column)
312 .context(error::PlanSqlSnafu)
313 })?;
314
315 let dataframe = match kind {
316 ShowKind::All | ShowKind::Like(_) => {
317 dataframe
319 }
320 ShowKind::Where(filter) => {
321 let view = dataframe.into_view();
324 let dataframe = SessionContext::new_with_state(
325 query_engine
326 .engine_context(query_ctx.clone())
327 .state()
328 .clone(),
329 )
330 .read_table(view)?;
331
332 let planner = query_engine.planner();
333 let planner = planner
334 .as_any()
335 .downcast_ref::<DfLogicalPlanner>()
336 .expect("Must be the datafusion planner");
337
338 let filter = planner
339 .sql_to_expr(filter, dataframe.schema(), false, query_ctx)
340 .await?;
341
342 dataframe.filter(filter).context(error::PlanSqlSnafu)?
344 }
345 };
346
347 let stream = dataframe.execute_stream().await?;
348
349 Ok(Output::new_with_stream(Box::pin(
350 RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
351 )))
352}
353
354pub async fn show_columns(
356 stmt: ShowColumns,
357 query_engine: &QueryEngineRef,
358 catalog_manager: &CatalogManagerRef,
359 query_ctx: QueryContextRef,
360) -> Result<Output> {
361 let schema_name = if let Some(database) = stmt.database {
362 database
363 } else {
364 query_ctx.current_schema()
365 };
366
367 let projects = if stmt.full {
368 vec![
369 (columns::COLUMN_NAME, FIELD_COLUMN),
370 (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
371 (columns::COLLATION_NAME, COLUMN_COLLATION_COLUMN),
372 (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
373 (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
374 (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
375 (columns::COLUMN_COMMENT, COLUMN_COMMENT_COLUMN),
376 (columns::PRIVILEGES, COLUMN_PRIVILEGES_COLUMN),
377 (columns::EXTRA, COLUMN_EXTRA_COLUMN),
378 (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
379 ]
380 } else {
381 vec![
382 (columns::COLUMN_NAME, FIELD_COLUMN),
383 (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
384 (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
385 (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
386 (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
387 (columns::EXTRA, COLUMN_EXTRA_COLUMN),
388 (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
389 ]
390 };
391
392 let filters = vec![
393 col(columns::TABLE_NAME).eq(lit(&stmt.table)),
394 col(columns::TABLE_SCHEMA).eq(lit(schema_name.clone())),
395 col(columns::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
396 ];
397 let like_field = Some(columns::COLUMN_NAME);
398 let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
399
400 query_from_information_schema_table(
401 query_engine,
402 catalog_manager,
403 query_ctx,
404 COLUMNS,
405 vec![],
406 projects,
407 filters,
408 like_field,
409 sort,
410 stmt.kind,
411 )
412 .await
413}
414
415pub async fn show_index(
417 stmt: ShowIndex,
418 query_engine: &QueryEngineRef,
419 catalog_manager: &CatalogManagerRef,
420 query_ctx: QueryContextRef,
421) -> Result<Output> {
422 let schema_name = if let Some(database) = stmt.database {
423 database
424 } else {
425 query_ctx.current_schema()
426 };
427
428 let select = vec![
429 col(statistics::TABLE_NAME).alias(INDEX_TABLE_COLUMN),
430 col(statistics::NON_UNIQUE).alias(INDEX_NONT_UNIQUE_COLUMN),
431 col(statistics::INDEX_NAME).alias(INDEX_KEY_NAME_COLUMN),
432 col(statistics::SEQ_IN_INDEX).alias(INDEX_SEQ_IN_INDEX_COLUMN),
433 col(statistics::COLUMN_NAME).alias(INDEX_COLUMN_NAME_COLUMN),
434 col(statistics::COLLATION).alias(COLUMN_COLLATION_COLUMN),
435 col(statistics::CARDINALITY).alias(INDEX_CARDINALITY_COLUMN),
436 col(statistics::SUB_PART).alias(INDEX_SUB_PART_COLUMN),
437 col(statistics::PACKED).alias(INDEX_PACKED_COLUMN),
438 col(statistics::NULLABLE).alias(COLUMN_NULLABLE_COLUMN),
439 col(statistics::INDEX_TYPE).alias(INDEX_INDEX_TYPE_COLUMN),
440 col(statistics::COMMENT).alias(COLUMN_COMMENT_COLUMN),
441 col(statistics::INDEX_COMMENT).alias(INDEX_COMMENT_COLUMN),
442 col(statistics::IS_VISIBLE).alias(INDEX_VISIBLE_COLUMN),
443 col(statistics::EXPRESSION).alias(INDEX_EXPRESSION_COLUMN),
444 ];
445
446 let projects = vec![
447 (statistics::TABLE_NAME, INDEX_TABLE_COLUMN),
448 (INDEX_NONT_UNIQUE_COLUMN, INDEX_NONT_UNIQUE_COLUMN),
449 (statistics::INDEX_NAME, INDEX_KEY_NAME_COLUMN),
450 (statistics::SEQ_IN_INDEX, INDEX_SEQ_IN_INDEX_COLUMN),
451 (statistics::COLUMN_NAME, INDEX_COLUMN_NAME_COLUMN),
452 (COLUMN_COLLATION_COLUMN, COLUMN_COLLATION_COLUMN),
453 (INDEX_CARDINALITY_COLUMN, INDEX_CARDINALITY_COLUMN),
454 (INDEX_SUB_PART_COLUMN, INDEX_SUB_PART_COLUMN),
455 (INDEX_PACKED_COLUMN, INDEX_PACKED_COLUMN),
456 (COLUMN_NULLABLE_COLUMN, COLUMN_NULLABLE_COLUMN),
457 (statistics::INDEX_TYPE, INDEX_INDEX_TYPE_COLUMN),
458 (COLUMN_COMMENT_COLUMN, COLUMN_COMMENT_COLUMN),
459 (INDEX_COMMENT_COLUMN, INDEX_COMMENT_COLUMN),
460 (INDEX_VISIBLE_COLUMN, INDEX_VISIBLE_COLUMN),
461 (INDEX_EXPRESSION_COLUMN, INDEX_EXPRESSION_COLUMN),
462 ];
463
464 let filters = vec![
465 col(statistics::TABLE_NAME).eq(lit(&stmt.table)),
466 col(statistics::TABLE_SCHEMA).eq(lit(schema_name.clone())),
467 ];
468 let like_field = None;
469 let sort = vec![
470 col(statistics::INDEX_NAME).sort(true, true),
471 col(statistics::SEQ_IN_INDEX).sort(true, true),
472 ];
473
474 query_from_information_schema_table(
475 query_engine,
476 catalog_manager,
477 query_ctx,
478 STATISTICS,
479 select,
480 projects,
481 filters,
482 like_field,
483 sort,
484 stmt.kind,
485 )
486 .await
487}
488
489pub async fn show_region(
491 stmt: ShowRegion,
492 query_engine: &QueryEngineRef,
493 catalog_manager: &CatalogManagerRef,
494 query_ctx: QueryContextRef,
495) -> Result<Output> {
496 let schema_name = if let Some(database) = stmt.database {
497 database
498 } else {
499 query_ctx.current_schema()
500 };
501
502 let filters = vec![
503 col(region_peers::TABLE_NAME).eq(lit(&stmt.table)),
504 col(region_peers::TABLE_SCHEMA).eq(lit(schema_name.clone())),
505 col(region_peers::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
506 ];
507 let projects = vec![
508 (region_peers::TABLE_NAME, "Table"),
509 (region_peers::REGION_ID, "Region"),
510 (region_peers::PEER_ID, "Peer"),
511 (region_peers::IS_LEADER, "Leader"),
512 ];
513
514 let like_field = None;
515 let sort = vec![
516 col(columns::REGION_ID).sort(true, true),
517 col(columns::PEER_ID).sort(true, true),
518 ];
519
520 query_from_information_schema_table(
521 query_engine,
522 catalog_manager,
523 query_ctx,
524 REGION_PEERS,
525 vec![],
526 projects,
527 filters,
528 like_field,
529 sort,
530 stmt.kind,
531 )
532 .await
533}
534
535pub async fn show_tables(
537 stmt: ShowTables,
538 query_engine: &QueryEngineRef,
539 catalog_manager: &CatalogManagerRef,
540 query_ctx: QueryContextRef,
541) -> Result<Output> {
542 let schema_name = if let Some(database) = stmt.database {
543 database
544 } else {
545 query_ctx.current_schema()
546 };
547
548 let tables_column = format!("Tables_in_{}", schema_name);
550 let projects = if stmt.full {
551 vec![
552 (tables::TABLE_NAME, tables_column.as_str()),
553 (tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
554 ]
555 } else {
556 vec![(tables::TABLE_NAME, tables_column.as_str())]
557 };
558 let filters = vec![
559 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
560 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
561 ];
562 let like_field = Some(tables::TABLE_NAME);
563 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
564
565 let kind = match stmt.kind {
568 ShowKind::Where(mut filter) => {
569 replace_column_in_expr(&mut filter, "Tables", &tables_column);
570 ShowKind::Where(filter)
571 }
572 other => other,
573 };
574
575 query_from_information_schema_table(
576 query_engine,
577 catalog_manager,
578 query_ctx,
579 TABLES,
580 vec![],
581 projects,
582 filters,
583 like_field,
584 sort,
585 kind,
586 )
587 .await
588}
589
590pub async fn show_table_status(
592 stmt: ShowTableStatus,
593 query_engine: &QueryEngineRef,
594 catalog_manager: &CatalogManagerRef,
595 query_ctx: QueryContextRef,
596) -> Result<Output> {
597 let schema_name = if let Some(database) = stmt.database {
598 database
599 } else {
600 query_ctx.current_schema()
601 };
602
603 let projects = vec![
605 (tables::TABLE_NAME, "Name"),
606 (tables::ENGINE, "Engine"),
607 (tables::VERSION, "Version"),
608 (tables::ROW_FORMAT, "Row_format"),
609 (tables::TABLE_ROWS, "Rows"),
610 (tables::AVG_ROW_LENGTH, "Avg_row_length"),
611 (tables::DATA_LENGTH, "Data_length"),
612 (tables::MAX_DATA_LENGTH, "Max_data_length"),
613 (tables::INDEX_LENGTH, "Index_length"),
614 (tables::DATA_FREE, "Data_free"),
615 (tables::AUTO_INCREMENT, "Auto_increment"),
616 (tables::CREATE_TIME, "Create_time"),
617 (tables::UPDATE_TIME, "Update_time"),
618 (tables::CHECK_TIME, "Check_time"),
619 (tables::TABLE_COLLATION, "Collation"),
620 (tables::CHECKSUM, "Checksum"),
621 (tables::CREATE_OPTIONS, "Create_options"),
622 (tables::TABLE_COMMENT, "Comment"),
623 ];
624
625 let filters = vec![
626 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
627 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
628 ];
629 let like_field = Some(tables::TABLE_NAME);
630 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
631
632 query_from_information_schema_table(
633 query_engine,
634 catalog_manager,
635 query_ctx,
636 TABLES,
637 vec![],
638 projects,
639 filters,
640 like_field,
641 sort,
642 stmt.kind,
643 )
644 .await
645}
646
647pub async fn show_collations(
649 kind: ShowKind,
650 query_engine: &QueryEngineRef,
651 catalog_manager: &CatalogManagerRef,
652 query_ctx: QueryContextRef,
653) -> Result<Output> {
654 let projects = vec![
656 ("collation_name", "Collation"),
657 ("character_set_name", "Charset"),
658 ("id", "Id"),
659 ("is_default", "Default"),
660 ("is_compiled", "Compiled"),
661 ("sortlen", "Sortlen"),
662 ];
663
664 let filters = vec![];
665 let like_field = Some("collation_name");
666 let sort = vec![];
667
668 query_from_information_schema_table(
669 query_engine,
670 catalog_manager,
671 query_ctx,
672 COLLATIONS,
673 vec![],
674 projects,
675 filters,
676 like_field,
677 sort,
678 kind,
679 )
680 .await
681}
682
683pub async fn show_charsets(
685 kind: ShowKind,
686 query_engine: &QueryEngineRef,
687 catalog_manager: &CatalogManagerRef,
688 query_ctx: QueryContextRef,
689) -> Result<Output> {
690 let projects = vec![
692 ("character_set_name", "Charset"),
693 ("description", "Description"),
694 ("default_collate_name", "Default collation"),
695 ("maxlen", "Maxlen"),
696 ];
697
698 let filters = vec![];
699 let like_field = Some("character_set_name");
700 let sort = vec![];
701
702 query_from_information_schema_table(
703 query_engine,
704 catalog_manager,
705 query_ctx,
706 CHARACTER_SETS,
707 vec![],
708 projects,
709 filters,
710 like_field,
711 sort,
712 kind,
713 )
714 .await
715}
716
717pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
718 let variable = stmt.variable.to_string().to_uppercase();
719 let value = match variable.as_str() {
720 "SYSTEM_TIME_ZONE" | "SYSTEM_TIMEZONE" => get_timezone(None).to_string(),
721 "TIME_ZONE" | "TIMEZONE" => query_ctx.timezone().to_string(),
722 "READ_PREFERENCE" => query_ctx.read_preference().to_string(),
723 "DATESTYLE" => {
724 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
725 format!("{}, {}", style, order)
726 }
727 "INTERVALSTYLE" => {
728 let style = *query_ctx
729 .configuration_parameter()
730 .pg_intervalstyle_format();
731 style.to_string()
732 }
733 "MAX_EXECUTION_TIME"
734 if query_ctx.channel() == Channel::Mysql => {
735 query_ctx.query_timeout_as_millis().to_string()
736 }
737 "STATEMENT_TIMEOUT"
738 if query_ctx.channel() == Channel::Postgres => {
740 let mut timeout = query_ctx.query_timeout_as_millis().to_string();
741 timeout.push_str("ms");
742 timeout
743 }
744 _ => return UnsupportedVariableSnafu { name: variable }.fail(),
745 };
746 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
747 variable,
748 ConcreteDataType::string_datatype(),
749 false,
750 )]));
751 let records = RecordBatches::try_from_columns(
752 schema,
753 vec![Arc::new(StringVector::from(vec![value])) as _],
754 )
755 .context(error::CreateRecordBatchSnafu)?;
756 Ok(Output::new_with_record_batches(records))
757}
758
759pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
760 let schema = Arc::new(Schema::new(vec![
761 ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), false),
762 ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
763 ]));
764 let records = RecordBatches::try_from_columns(
765 schema,
766 vec![
767 Arc::new(StringVector::from(Vec::<&str>::new())) as _,
768 Arc::new(StringVector::from(Vec::<&str>::new())) as _,
769 ],
770 )
771 .context(error::CreateRecordBatchSnafu)?;
772 Ok(Output::new_with_record_batches(records))
773}
774
775pub async fn show_search_path(_query_ctx: QueryContextRef) -> Result<Output> {
776 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
777 "search_path",
778 ConcreteDataType::string_datatype(),
779 false,
780 )]));
781 let records = RecordBatches::try_from_columns(
782 schema,
783 vec![Arc::new(StringVector::from(vec![_query_ctx.current_schema()])) as _],
784 )
785 .context(error::CreateRecordBatchSnafu)?;
786 Ok(Output::new_with_record_batches(records))
787}
788
789pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
790 let stmt = CreateDatabase {
791 name: ObjectName::from(vec![Ident::new(database_name)]),
792 if_not_exists: true,
793 options,
794 };
795 let sql = format!("{stmt}");
796 let columns = vec![
797 Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
798 Arc::new(StringVector::from(vec![sql])) as _,
799 ];
800 let records =
801 RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
802 .context(error::CreateRecordBatchSnafu)?;
803 Ok(Output::new_with_record_batches(records))
804}
805
806pub fn show_create_table(
807 table_info: TableInfoRef,
808 schema_options: Option<SchemaOptions>,
809 partitions: Option<Partitions>,
810 query_ctx: QueryContextRef,
811) -> Result<Output> {
812 let table_name = table_info.name.clone();
813
814 let quote_style = query_ctx.quote_style();
815
816 let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
817 stmt.partitions = partitions.map(|mut p| {
818 p.set_quote(quote_style);
819 p
820 });
821 let sql = format!("{}", stmt);
822 let columns = vec![
823 Arc::new(StringVector::from(vec![table_name])) as _,
824 Arc::new(StringVector::from(vec![sql])) as _,
825 ];
826 let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
827 .context(error::CreateRecordBatchSnafu)?;
828
829 Ok(Output::new_with_record_batches(records))
830}
831
832pub fn show_create_foreign_table_for_pg(
833 table: TableRef,
834 _query_ctx: QueryContextRef,
835) -> Result<Output> {
836 let table_info = table.table_info();
837
838 let table_meta = &table_info.meta;
839 let table_name = &table_info.name;
840 let schema = &table_info.meta.schema;
841 let is_metric_engine = is_metric_engine(&table_meta.engine);
842
843 let columns = schema
844 .column_schemas()
845 .iter()
846 .filter_map(|c| {
847 if is_metric_engine && is_metric_engine_internal_column(&c.name) {
848 None
849 } else {
850 Some(format!(
851 "\"{}\" {}",
852 c.name,
853 c.data_type.postgres_datatype_name()
854 ))
855 }
856 })
857 .join(",\n ");
858
859 let sql = format!(
860 r#"CREATE FOREIGN TABLE ft_{} (
861 {}
862)
863SERVER greptimedb
864OPTIONS (table_name '{}')"#,
865 table_name, columns, table_name
866 );
867
868 let columns = vec![
869 Arc::new(StringVector::from(vec![table_name.clone()])) as _,
870 Arc::new(StringVector::from(vec![sql])) as _,
871 ];
872 let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
873 .context(error::CreateRecordBatchSnafu)?;
874
875 Ok(Output::new_with_record_batches(records))
876}
877
878pub fn show_create_view(
879 view_name: ObjectName,
880 definition: &str,
881 query_ctx: QueryContextRef,
882) -> Result<Output> {
883 let mut parser_ctx =
884 ParserContext::new(query_ctx.sql_dialect(), definition).context(error::SqlSnafu)?;
885
886 let Statement::CreateView(create_view) =
887 parser_ctx.parse_statement().context(error::SqlSnafu)?
888 else {
889 unreachable!();
891 };
892
893 let stmt = CreateView {
894 name: view_name.clone(),
895 columns: create_view.columns,
896 query: create_view.query,
897 or_replace: create_view.or_replace,
898 if_not_exists: create_view.if_not_exists,
899 };
900
901 let sql = format!("{}", stmt);
902 let columns = vec![
903 Arc::new(StringVector::from(vec![view_name.to_string()])) as _,
904 Arc::new(StringVector::from(vec![sql])) as _,
905 ];
906 let records = RecordBatches::try_from_columns(SHOW_CREATE_VIEW_OUTPUT_SCHEMA.clone(), columns)
907 .context(error::CreateRecordBatchSnafu)?;
908
909 Ok(Output::new_with_record_batches(records))
910}
911
912pub async fn show_views(
914 stmt: ShowViews,
915 query_engine: &QueryEngineRef,
916 catalog_manager: &CatalogManagerRef,
917 query_ctx: QueryContextRef,
918) -> Result<Output> {
919 let schema_name = if let Some(database) = stmt.database {
920 database
921 } else {
922 query_ctx.current_schema()
923 };
924
925 let projects = vec![(tables::TABLE_NAME, VIEWS_COLUMN)];
926 let filters = vec![
927 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
928 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
929 ];
930 let like_field = Some(tables::TABLE_NAME);
931 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
932
933 query_from_information_schema_table(
934 query_engine,
935 catalog_manager,
936 query_ctx,
937 VIEWS,
938 vec![],
939 projects,
940 filters,
941 like_field,
942 sort,
943 stmt.kind,
944 )
945 .await
946}
947
948pub async fn show_flows(
950 stmt: ShowFlows,
951 query_engine: &QueryEngineRef,
952 catalog_manager: &CatalogManagerRef,
953 query_ctx: QueryContextRef,
954) -> Result<Output> {
955 let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
956 let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
957 let like_field = Some(flows::FLOW_NAME);
958 let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
959
960 query_from_information_schema_table(
961 query_engine,
962 catalog_manager,
963 query_ctx,
964 FLOWS,
965 vec![],
966 projects,
967 filters,
968 like_field,
969 sort,
970 stmt.kind,
971 )
972 .await
973}
974
975#[cfg(feature = "enterprise")]
976pub async fn show_triggers(
977 stmt: sql::statements::show::trigger::ShowTriggers,
978 query_engine: &QueryEngineRef,
979 catalog_manager: &CatalogManagerRef,
980 query_ctx: QueryContextRef,
981) -> Result<Output> {
982 const TRIGGER_NAME: &str = "trigger_name";
983 const TRIGGERS_COLUMN: &str = "Triggers";
984
985 let projects = vec![(TRIGGER_NAME, TRIGGERS_COLUMN)];
986 let like_field = Some(TRIGGER_NAME);
987 let sort = vec![col(TRIGGER_NAME).sort(true, true)];
988
989 query_from_information_schema_table(
990 query_engine,
991 catalog_manager,
992 query_ctx,
993 catalog::information_schema::TRIGGERS,
994 vec![],
995 projects,
996 vec![],
997 like_field,
998 sort,
999 stmt.kind,
1000 )
1001 .await
1002}
1003
1004pub fn show_create_flow(
1005 flow_name: ObjectName,
1006 flow_val: FlowInfoValue,
1007 query_ctx: QueryContextRef,
1008) -> Result<Output> {
1009 let mut parser_ctx =
1010 ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
1011
1012 let query = parser_ctx.parse_statement().context(error::SqlSnafu)?;
1013
1014 let raw_query = match &query {
1016 Statement::Tql(_) => flow_val.raw_sql().clone(),
1017 _ => query.to_string(),
1018 };
1019
1020 let query = Box::new(SqlOrTql::try_from_statement(query, &raw_query).context(error::SqlSnafu)?);
1021
1022 let comment = if flow_val.comment().is_empty() {
1023 None
1024 } else {
1025 Some(flow_val.comment().clone())
1026 };
1027
1028 let stmt = CreateFlow {
1029 flow_name,
1030 sink_table_name: ObjectName::from(vec![
1031 Ident::new(&flow_val.sink_table_name().schema_name),
1032 Ident::new(&flow_val.sink_table_name().table_name),
1033 ]),
1034 or_replace: false,
1037 if_not_exists: true,
1038 expire_after: flow_val.expire_after(),
1039 eval_interval: flow_val.eval_interval(),
1040 comment,
1041 flow_options: OptionMap::from_filtered_string_map(
1042 flow_val.options(),
1043 &[FlowType::FLOW_TYPE_KEY],
1044 ),
1045 query,
1046 };
1047
1048 let sql = format!("{}", stmt);
1049 let columns = vec![
1050 Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
1051 Arc::new(StringVector::from(vec![sql])) as _,
1052 ];
1053 let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
1054 .context(error::CreateRecordBatchSnafu)?;
1055
1056 Ok(Output::new_with_record_batches(records))
1057}
1058
1059pub fn describe_table(table: TableRef) -> Result<Output> {
1060 let table_info = table.table_info();
1061 let columns_schemas = table_info.meta.schema.column_schemas();
1062 let columns = vec![
1063 describe_column_names(columns_schemas),
1064 describe_column_types(columns_schemas),
1065 describe_column_keys(columns_schemas, &table_info.meta.primary_key_indices),
1066 describe_column_nullables(columns_schemas),
1067 describe_column_defaults(columns_schemas),
1068 describe_column_semantic_types(columns_schemas, &table_info.meta.primary_key_indices),
1069 ];
1070 let records = RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), columns)
1071 .context(error::CreateRecordBatchSnafu)?;
1072 Ok(Output::new_with_record_batches(records))
1073}
1074
1075fn describe_column_names(columns_schemas: &[ColumnSchema]) -> VectorRef {
1076 Arc::new(StringVector::from_iterator(
1077 columns_schemas.iter().map(|cs| cs.name.as_str()),
1078 ))
1079}
1080
1081fn describe_column_types(columns_schemas: &[ColumnSchema]) -> VectorRef {
1082 Arc::new(StringVector::from(
1083 columns_schemas
1084 .iter()
1085 .map(|cs| cs.data_type.name())
1086 .collect::<Vec<_>>(),
1087 ))
1088}
1089
1090fn describe_column_keys(
1091 columns_schemas: &[ColumnSchema],
1092 primary_key_indices: &[usize],
1093) -> VectorRef {
1094 Arc::new(StringVector::from_iterator(
1095 columns_schemas.iter().enumerate().map(|(i, cs)| {
1096 if cs.is_time_index() || primary_key_indices.contains(&i) {
1097 PRI_KEY
1098 } else {
1099 ""
1100 }
1101 }),
1102 ))
1103}
1104
1105fn describe_column_nullables(columns_schemas: &[ColumnSchema]) -> VectorRef {
1106 Arc::new(StringVector::from_iterator(columns_schemas.iter().map(
1107 |cs| {
1108 if cs.is_nullable() { YES_STR } else { NO_STR }
1109 },
1110 )))
1111}
1112
1113fn describe_column_defaults(columns_schemas: &[ColumnSchema]) -> VectorRef {
1114 Arc::new(StringVector::from(
1115 columns_schemas
1116 .iter()
1117 .map(|cs| {
1118 cs.default_constraint()
1119 .map_or(String::from(""), |dc| dc.to_string())
1120 })
1121 .collect::<Vec<String>>(),
1122 ))
1123}
1124
1125fn describe_column_semantic_types(
1126 columns_schemas: &[ColumnSchema],
1127 primary_key_indices: &[usize],
1128) -> VectorRef {
1129 Arc::new(StringVector::from_iterator(
1130 columns_schemas.iter().enumerate().map(|(i, cs)| {
1131 if primary_key_indices.contains(&i) {
1132 SEMANTIC_TYPE_PRIMARY_KEY
1133 } else if cs.is_time_index() {
1134 SEMANTIC_TYPE_TIME_INDEX
1135 } else {
1136 SEMANTIC_TYPE_FIELD
1137 }
1138 }),
1139 ))
1140}
1141
1142pub async fn prepare_file_table_files(
1144 options: &HashMap<String, String>,
1145) -> Result<(ObjectStore, Vec<String>)> {
1146 let url = options
1147 .get(FILE_TABLE_LOCATION_KEY)
1148 .context(error::MissingRequiredFieldSnafu {
1149 name: FILE_TABLE_LOCATION_KEY,
1150 })?;
1151
1152 let (dir, filename) = find_dir_and_filename(url);
1153 let source = if let Some(filename) = filename {
1154 Source::Filename(filename)
1155 } else {
1156 Source::Dir
1157 };
1158 let regex = options
1159 .get(FILE_TABLE_PATTERN_KEY)
1160 .map(|x| Regex::new(x))
1161 .transpose()
1162 .context(error::BuildRegexSnafu)?;
1163 let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
1164 let lister = Lister::new(object_store.clone(), source, dir, regex);
1165 let files = lister
1170 .list()
1171 .await
1172 .context(error::ListObjectsSnafu)?
1173 .into_iter()
1174 .filter_map(|entry| {
1175 if entry.path().ends_with('/') {
1176 None
1177 } else {
1178 Some(entry.path().to_string())
1179 }
1180 })
1181 .collect::<Vec<_>>();
1182 Ok((object_store, files))
1183}
1184
1185pub async fn infer_file_table_schema(
1186 object_store: &ObjectStore,
1187 files: &[String],
1188 options: &HashMap<String, String>,
1189) -> Result<Schema> {
1190 let format = parse_file_table_format(options)?;
1191 let merged = infer_schemas(object_store, files, format.as_ref())
1192 .await
1193 .context(error::InferSchemaSnafu)?;
1194 Schema::try_from(merged).context(error::ConvertSchemaSnafu)
1195}
1196
1197pub fn file_column_schemas_to_table(
1207 file_column_schemas: &[ColumnSchema],
1208) -> (Vec<ColumnSchema>, String) {
1209 let mut column_schemas = file_column_schemas.to_owned();
1210 if let Some(time_index_column) = column_schemas.iter().find(|c| c.is_time_index()) {
1211 let time_index = time_index_column.name.clone();
1212 return (column_schemas, time_index);
1213 }
1214
1215 let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
1216 let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
1217 let timestamp_column_schema = ColumnSchema::new(greptime_timestamp(), timestamp_type, false)
1218 .with_time_index(true)
1219 .with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
1220 .unwrap();
1221
1222 if let Some(column_schema) = column_schemas
1223 .iter_mut()
1224 .find(|column_schema| column_schema.name == greptime_timestamp())
1225 {
1226 *column_schema = timestamp_column_schema;
1228 } else {
1229 column_schemas.push(timestamp_column_schema);
1230 }
1231
1232 (column_schemas, greptime_timestamp().to_string())
1233}
1234
1235pub fn check_file_to_table_schema_compatibility(
1244 file_column_schemas: &[ColumnSchema],
1245 table_column_schemas: &[ColumnSchema],
1246) -> Result<()> {
1247 let file_schemas_map = file_column_schemas
1248 .iter()
1249 .map(|s| (s.name.clone(), s))
1250 .collect::<HashMap<_, _>>();
1251
1252 for table_column in table_column_schemas {
1253 if let Some(file_column) = file_schemas_map.get(&table_column.name) {
1254 ensure!(
1256 file_column
1257 .data_type
1258 .can_arrow_type_cast_to(&table_column.data_type),
1259 error::ColumnSchemaIncompatibleSnafu {
1260 column: table_column.name.clone(),
1261 file_type: file_column.data_type.clone(),
1262 table_type: table_column.data_type.clone(),
1263 }
1264 );
1265 } else {
1266 ensure!(
1267 table_column.is_nullable() || table_column.default_constraint().is_some(),
1268 error::ColumnSchemaNoDefaultSnafu {
1269 column: table_column.name.clone(),
1270 }
1271 );
1272 }
1273 }
1274
1275 Ok(())
1276}
1277
1278fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn FileFormat>> {
1279 Ok(
1280 match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
1281 Format::Csv(format) => Box::new(format),
1282 Format::Json(format) => Box::new(format),
1283 Format::Parquet(format) => Box::new(format),
1284 Format::Orc(format) => Box::new(format),
1285 },
1286 )
1287}
1288
1289pub async fn show_processlist(
1290 stmt: ShowProcessList,
1291 query_engine: &QueryEngineRef,
1292 catalog_manager: &CatalogManagerRef,
1293 query_ctx: QueryContextRef,
1294) -> Result<Output> {
1295 let projects = if stmt.full {
1296 vec![
1297 (process_list::ID, "Id"),
1298 (process_list::CATALOG, "Catalog"),
1299 (process_list::SCHEMAS, "Schema"),
1300 (process_list::CLIENT, "Client"),
1301 (process_list::FRONTEND, "Frontend"),
1302 (process_list::START_TIMESTAMP, "Start Time"),
1303 (process_list::ELAPSED_TIME, "Elapsed Time"),
1304 (process_list::QUERY, "Query"),
1305 ]
1306 } else {
1307 vec![
1308 (process_list::ID, "Id"),
1309 (process_list::CATALOG, "Catalog"),
1310 (process_list::QUERY, "Query"),
1311 (process_list::ELAPSED_TIME, "Elapsed Time"),
1312 ]
1313 };
1314
1315 let filters = vec![];
1316 let like_field = None;
1317 let sort = vec![col("id").sort(true, true)];
1318 query_from_information_schema_table(
1319 query_engine,
1320 catalog_manager,
1321 query_ctx.clone(),
1322 "process_list",
1323 vec![],
1324 projects.clone(),
1325 filters,
1326 like_field,
1327 sort,
1328 ShowKind::All,
1329 )
1330 .await
1331}
1332
1333#[cfg(test)]
1334mod test {
1335 use std::sync::Arc;
1336
1337 use common_query::{Output, OutputData};
1338 use common_recordbatch::{RecordBatch, RecordBatches};
1339 use common_time::Timezone;
1340 use common_time::timestamp::TimeUnit;
1341 use datatypes::prelude::ConcreteDataType;
1342 use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
1343 use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
1344 use session::context::QueryContextBuilder;
1345 use snafu::ResultExt;
1346 use sql::ast::{Ident, ObjectName};
1347 use sql::statements::show::ShowVariables;
1348 use table::TableRef;
1349 use table::test_util::MemTable;
1350
1351 use super::show_variable;
1352 use crate::error;
1353 use crate::error::Result;
1354 use crate::sql::{
1355 DESCRIBE_TABLE_OUTPUT_SCHEMA, NO_STR, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_TIME_INDEX,
1356 YES_STR, describe_table,
1357 };
1358
1359 #[test]
1360 fn test_describe_table_multiple_columns() -> Result<()> {
1361 let table_name = "test_table";
1362 let schema = vec![
1363 ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
1364 ColumnSchema::new(
1365 "t2",
1366 ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
1367 false,
1368 )
1369 .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
1370 "current_timestamp()",
1371 ))))
1372 .unwrap()
1373 .with_time_index(true),
1374 ];
1375 let data = vec![
1376 Arc::new(UInt32Vector::from_slice([0])) as _,
1377 Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
1378 ];
1379 let expected_columns = vec![
1380 Arc::new(StringVector::from(vec!["t1", "t2"])) as _,
1381 Arc::new(StringVector::from(vec!["UInt32", "TimestampMillisecond"])) as _,
1382 Arc::new(StringVector::from(vec!["", "PRI"])) as _,
1383 Arc::new(StringVector::from(vec![YES_STR, NO_STR])) as _,
1384 Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
1385 Arc::new(StringVector::from(vec![
1386 SEMANTIC_TYPE_FIELD,
1387 SEMANTIC_TYPE_TIME_INDEX,
1388 ])) as _,
1389 ];
1390
1391 describe_table_test_by_schema(table_name, schema, data, expected_columns)
1392 }
1393
1394 fn describe_table_test_by_schema(
1395 table_name: &str,
1396 schema: Vec<ColumnSchema>,
1397 data: Vec<VectorRef>,
1398 expected_columns: Vec<VectorRef>,
1399 ) -> Result<()> {
1400 let table_schema = SchemaRef::new(Schema::new(schema));
1401 let table = prepare_describe_table(table_name, table_schema, data);
1402
1403 let expected =
1404 RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
1405 .context(error::CreateRecordBatchSnafu)?;
1406
1407 if let OutputData::RecordBatches(res) = describe_table(table)?.data {
1408 assert_eq!(res.take(), expected.take());
1409 } else {
1410 panic!("describe table must return record batch");
1411 }
1412
1413 Ok(())
1414 }
1415
1416 fn prepare_describe_table(
1417 table_name: &str,
1418 table_schema: SchemaRef,
1419 data: Vec<VectorRef>,
1420 ) -> TableRef {
1421 let record_batch = RecordBatch::new(table_schema, data).unwrap();
1422 MemTable::table(table_name, record_batch)
1423 }
1424
1425 #[test]
1426 fn test_show_variable() {
1427 assert_eq!(
1428 exec_show_variable("SYSTEM_TIME_ZONE", "Asia/Shanghai").unwrap(),
1429 "UTC"
1430 );
1431 assert_eq!(
1432 exec_show_variable("SYSTEM_TIMEZONE", "Asia/Shanghai").unwrap(),
1433 "UTC"
1434 );
1435 assert_eq!(
1436 exec_show_variable("TIME_ZONE", "Asia/Shanghai").unwrap(),
1437 "Asia/Shanghai"
1438 );
1439 assert_eq!(
1440 exec_show_variable("TIMEZONE", "Asia/Shanghai").unwrap(),
1441 "Asia/Shanghai"
1442 );
1443 assert!(exec_show_variable("TIME ZONE", "Asia/Shanghai").is_err());
1444 assert!(exec_show_variable("SYSTEM TIME ZONE", "Asia/Shanghai").is_err());
1445 }
1446
1447 fn exec_show_variable(variable: &str, tz: &str) -> Result<String> {
1448 let stmt = ShowVariables {
1449 variable: ObjectName::from(vec![Ident::new(variable)]),
1450 };
1451 let ctx = Arc::new(
1452 QueryContextBuilder::default()
1453 .timezone(Timezone::from_tz_string(tz).unwrap())
1454 .build(),
1455 );
1456 match show_variable(stmt, ctx) {
1457 Ok(Output {
1458 data: OutputData::RecordBatches(record),
1459 ..
1460 }) => {
1461 let record = record.take().first().cloned().unwrap();
1462 Ok(record.iter_column_as_string(0).next().unwrap().unwrap())
1463 }
1464 Ok(_) => unreachable!(),
1465 Err(e) => Err(e),
1466 }
1467 }
1468}