1mod show_create_table;
16
17use std::collections::HashMap;
18use std::ops::ControlFlow;
19use std::sync::Arc;
20
21use catalog::CatalogManagerRef;
22use catalog::information_schema::{
23 CHARACTER_SETS, COLLATIONS, COLUMNS, FLOWS, KEY_COLUMN_USAGE, REGION_PEERS, SCHEMATA, TABLES,
24 VIEWS, columns, flows, key_column_usage, process_list, region_peers, schemata, tables,
25};
26use common_catalog::consts::{
27 INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
28 SEMANTIC_TYPE_TIME_INDEX,
29};
30use common_catalog::format_full_table_name;
31use common_datasource::file_format::{FileFormat, Format, infer_schemas};
32use common_datasource::lister::{Lister, Source};
33use common_datasource::object_store::build_backend;
34use common_datasource::util::find_dir_and_filename;
35use common_meta::SchemaOptions;
36use common_meta::key::flow::flow_info::FlowInfoValue;
37use common_query::Output;
38use common_query::prelude::greptime_timestamp;
39use common_recordbatch::RecordBatches;
40use common_recordbatch::adapter::RecordBatchStreamAdapter;
41use common_time::Timestamp;
42use common_time::timezone::get_timezone;
43use datafusion::common::ScalarValue;
44use datafusion::prelude::SessionContext;
45use datafusion_expr::{Expr, SortExpr, case, col, lit};
46use datatypes::prelude::*;
47use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
48use datatypes::vectors::StringVector;
49use itertools::Itertools;
50use object_store::ObjectStore;
51use once_cell::sync::Lazy;
52use regex::Regex;
53use session::context::{Channel, QueryContextRef};
54pub use show_create_table::create_table_stmt;
55use snafu::{OptionExt, ResultExt, ensure};
56use sql::ast::{Ident, visit_expressions_mut};
57use sql::parser::ParserContext;
58use sql::statements::OptionMap;
59use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions, SqlOrTql};
60use sql::statements::show::{
61 ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowProcessList, ShowRegion,
62 ShowTableStatus, ShowTables, ShowVariables, ShowViews,
63};
64use sql::statements::statement::Statement;
65use sqlparser::ast::ObjectName;
66use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
67use table::TableRef;
68use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
69
70use crate::QueryEngineRef;
71use crate::error::{self, Result, UnsupportedVariableSnafu};
72use crate::planner::DfLogicalPlanner;
73
74const SCHEMAS_COLUMN: &str = "Database";
75const OPTIONS_COLUMN: &str = "Options";
76const VIEWS_COLUMN: &str = "Views";
77const FLOWS_COLUMN: &str = "Flows";
78const FIELD_COLUMN: &str = "Field";
79const TABLE_TYPE_COLUMN: &str = "Table_type";
80const COLUMN_NAME_COLUMN: &str = "Column";
81const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
82const COLUMN_TYPE_COLUMN: &str = "Type";
83const COLUMN_KEY_COLUMN: &str = "Key";
84const COLUMN_EXTRA_COLUMN: &str = "Extra";
85const COLUMN_PRIVILEGES_COLUMN: &str = "Privileges";
86const COLUMN_COLLATION_COLUMN: &str = "Collation";
87const COLUMN_NULLABLE_COLUMN: &str = "Null";
88const COLUMN_DEFAULT_COLUMN: &str = "Default";
89const COLUMN_COMMENT_COLUMN: &str = "Comment";
90const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
91
92const YES_STR: &str = "YES";
93const NO_STR: &str = "NO";
94const PRI_KEY: &str = "PRI";
95const TIME_INDEX: &str = "TIME INDEX";
96
97const INDEX_TABLE_COLUMN: &str = "Table";
99const INDEX_NONT_UNIQUE_COLUMN: &str = "Non_unique";
100const INDEX_CARDINALITY_COLUMN: &str = "Cardinality";
101const INDEX_SUB_PART_COLUMN: &str = "Sub_part";
102const INDEX_PACKED_COLUMN: &str = "Packed";
103const INDEX_INDEX_TYPE_COLUMN: &str = "Index_type";
104const INDEX_COMMENT_COLUMN: &str = "Index_comment";
105const INDEX_VISIBLE_COLUMN: &str = "Visible";
106const INDEX_EXPRESSION_COLUMN: &str = "Expression";
107const INDEX_KEY_NAME_COLUMN: &str = "Key_name";
108const INDEX_SEQ_IN_INDEX_COLUMN: &str = "Seq_in_index";
109const INDEX_COLUMN_NAME_COLUMN: &str = "Column_name";
110
111static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
112 Arc::new(Schema::new(vec![
113 ColumnSchema::new(
114 COLUMN_NAME_COLUMN,
115 ConcreteDataType::string_datatype(),
116 false,
117 ),
118 ColumnSchema::new(
119 COLUMN_TYPE_COLUMN,
120 ConcreteDataType::string_datatype(),
121 false,
122 ),
123 ColumnSchema::new(COLUMN_KEY_COLUMN, ConcreteDataType::string_datatype(), true),
124 ColumnSchema::new(
125 COLUMN_NULLABLE_COLUMN,
126 ConcreteDataType::string_datatype(),
127 false,
128 ),
129 ColumnSchema::new(
130 COLUMN_DEFAULT_COLUMN,
131 ConcreteDataType::string_datatype(),
132 false,
133 ),
134 ColumnSchema::new(
135 COLUMN_SEMANTIC_TYPE_COLUMN,
136 ConcreteDataType::string_datatype(),
137 false,
138 ),
139 ]))
140});
141
142static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
143 Arc::new(Schema::new(vec![
144 ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
145 ColumnSchema::new(
146 "Create Database",
147 ConcreteDataType::string_datatype(),
148 false,
149 ),
150 ]))
151});
152
153static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
154 Arc::new(Schema::new(vec![
155 ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
156 ColumnSchema::new("Create Table", ConcreteDataType::string_datatype(), false),
157 ]))
158});
159
160static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
161 Arc::new(Schema::new(vec![
162 ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
163 ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
164 ]))
165});
166
167static SHOW_CREATE_VIEW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
168 Arc::new(Schema::new(vec![
169 ColumnSchema::new("View", ConcreteDataType::string_datatype(), false),
170 ColumnSchema::new("Create View", ConcreteDataType::string_datatype(), false),
171 ]))
172});
173
174fn null() -> Expr {
175 lit(ScalarValue::Null)
176}
177
178pub async fn show_databases(
179 stmt: ShowDatabases,
180 query_engine: &QueryEngineRef,
181 catalog_manager: &CatalogManagerRef,
182 query_ctx: QueryContextRef,
183) -> Result<Output> {
184 let projects = if stmt.full {
185 vec![
186 (schemata::SCHEMA_NAME, SCHEMAS_COLUMN),
187 (schemata::SCHEMA_OPTS, OPTIONS_COLUMN),
188 ]
189 } else {
190 vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)]
191 };
192
193 let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
194 let like_field = Some(schemata::SCHEMA_NAME);
195 let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
196
197 query_from_information_schema_table(
198 query_engine,
199 catalog_manager,
200 query_ctx,
201 SCHEMATA,
202 vec![],
203 projects,
204 filters,
205 like_field,
206 sort,
207 stmt.kind,
208 )
209 .await
210}
211
212fn replace_column_in_expr(expr: &mut sqlparser::ast::Expr, from_column: &str, to_column: &str) {
215 let _ = visit_expressions_mut(expr, |e| {
216 match e {
217 sqlparser::ast::Expr::Identifier(ident) => {
218 if ident.value.eq_ignore_ascii_case(from_column) {
219 ident.value = to_column.to_string();
220 }
221 }
222 sqlparser::ast::Expr::CompoundIdentifier(idents) => {
223 if let Some(last) = idents.last_mut()
224 && last.value.eq_ignore_ascii_case(from_column)
225 {
226 last.value = to_column.to_string();
227 }
228 }
229 _ => {}
230 }
231 ControlFlow::<()>::Continue(())
232 });
233}
234
235#[allow(clippy::too_many_arguments)]
243async fn query_from_information_schema_table(
244 query_engine: &QueryEngineRef,
245 catalog_manager: &CatalogManagerRef,
246 query_ctx: QueryContextRef,
247 table_name: &str,
248 select: Vec<Expr>,
249 projects: Vec<(&str, &str)>,
250 filters: Vec<Expr>,
251 like_field: Option<&str>,
252 sort: Vec<SortExpr>,
253 kind: ShowKind,
254) -> Result<Output> {
255 let table = catalog_manager
256 .table(
257 query_ctx.current_catalog(),
258 INFORMATION_SCHEMA_NAME,
259 table_name,
260 Some(&query_ctx),
261 )
262 .await
263 .context(error::CatalogSnafu)?
264 .with_context(|| error::TableNotFoundSnafu {
265 table: format_full_table_name(
266 query_ctx.current_catalog(),
267 INFORMATION_SCHEMA_NAME,
268 table_name,
269 ),
270 })?;
271
272 let dataframe = query_engine.read_table(table)?;
273
274 let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
276 df.filter(expr).context(error::PlanSqlSnafu)
277 })?;
278
279 let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
281 dataframe
282 .filter(col(field).like(lit(ident.value.clone())))
283 .context(error::PlanSqlSnafu)?
284 } else {
285 dataframe
286 };
287
288 let dataframe = if sort.is_empty() {
290 dataframe
291 } else {
292 dataframe.sort(sort).context(error::PlanSqlSnafu)?
293 };
294
295 let dataframe = if select.is_empty() {
297 if projects.is_empty() {
298 dataframe
299 } else {
300 let projection = projects
301 .iter()
302 .map(|x| col(x.0).alias(x.1))
303 .collect::<Vec<_>>();
304 dataframe.select(projection).context(error::PlanSqlSnafu)?
305 }
306 } else {
307 dataframe.select(select).context(error::PlanSqlSnafu)?
308 };
309
310 let dataframe = projects
312 .into_iter()
313 .try_fold(dataframe, |df, (column, renamed_column)| {
314 df.with_column_renamed(column, renamed_column)
315 .context(error::PlanSqlSnafu)
316 })?;
317
318 let dataframe = match kind {
319 ShowKind::All | ShowKind::Like(_) => {
320 dataframe
322 }
323 ShowKind::Where(filter) => {
324 let view = dataframe.into_view();
327 let dataframe = SessionContext::new_with_state(
328 query_engine
329 .engine_context(query_ctx.clone())
330 .state()
331 .clone(),
332 )
333 .read_table(view)?;
334
335 let planner = query_engine.planner();
336 let planner = planner
337 .as_any()
338 .downcast_ref::<DfLogicalPlanner>()
339 .expect("Must be the datafusion planner");
340
341 let filter = planner
342 .sql_to_expr(filter, dataframe.schema(), false, query_ctx)
343 .await?;
344
345 dataframe.filter(filter).context(error::PlanSqlSnafu)?
347 }
348 };
349
350 let stream = dataframe.execute_stream().await?;
351
352 Ok(Output::new_with_stream(Box::pin(
353 RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
354 )))
355}
356
357pub async fn show_columns(
359 stmt: ShowColumns,
360 query_engine: &QueryEngineRef,
361 catalog_manager: &CatalogManagerRef,
362 query_ctx: QueryContextRef,
363) -> Result<Output> {
364 let schema_name = if let Some(database) = stmt.database {
365 database
366 } else {
367 query_ctx.current_schema()
368 };
369
370 let projects = if stmt.full {
371 vec![
372 (columns::COLUMN_NAME, FIELD_COLUMN),
373 (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
374 (columns::COLLATION_NAME, COLUMN_COLLATION_COLUMN),
375 (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
376 (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
377 (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
378 (columns::COLUMN_COMMENT, COLUMN_COMMENT_COLUMN),
379 (columns::PRIVILEGES, COLUMN_PRIVILEGES_COLUMN),
380 (columns::EXTRA, COLUMN_EXTRA_COLUMN),
381 (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
382 ]
383 } else {
384 vec![
385 (columns::COLUMN_NAME, FIELD_COLUMN),
386 (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
387 (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
388 (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
389 (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
390 (columns::EXTRA, COLUMN_EXTRA_COLUMN),
391 (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
392 ]
393 };
394
395 let filters = vec![
396 col(columns::TABLE_NAME).eq(lit(&stmt.table)),
397 col(columns::TABLE_SCHEMA).eq(lit(schema_name.clone())),
398 col(columns::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
399 ];
400 let like_field = Some(columns::COLUMN_NAME);
401 let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
402
403 query_from_information_schema_table(
404 query_engine,
405 catalog_manager,
406 query_ctx,
407 COLUMNS,
408 vec![],
409 projects,
410 filters,
411 like_field,
412 sort,
413 stmt.kind,
414 )
415 .await
416}
417
418pub async fn show_index(
420 stmt: ShowIndex,
421 query_engine: &QueryEngineRef,
422 catalog_manager: &CatalogManagerRef,
423 query_ctx: QueryContextRef,
424) -> Result<Output> {
425 let schema_name = if let Some(database) = stmt.database {
426 database
427 } else {
428 query_ctx.current_schema()
429 };
430
431 let select = vec![
432 col(key_column_usage::TABLE_NAME).alias(INDEX_TABLE_COLUMN),
433 lit(1).alias(INDEX_NONT_UNIQUE_COLUMN),
435 col(key_column_usage::CONSTRAINT_NAME).alias(INDEX_KEY_NAME_COLUMN),
436 col(key_column_usage::ORDINAL_POSITION).alias(INDEX_SEQ_IN_INDEX_COLUMN),
437 col(key_column_usage::COLUMN_NAME).alias(INDEX_COLUMN_NAME_COLUMN),
438 lit("A").alias(COLUMN_COLLATION_COLUMN),
440 null().alias(INDEX_CARDINALITY_COLUMN),
441 null().alias(INDEX_SUB_PART_COLUMN),
442 null().alias(INDEX_PACKED_COLUMN),
443 case(col(key_column_usage::CONSTRAINT_NAME))
448 .when(lit(TIME_INDEX), lit(NO_STR))
449 .otherwise(lit(YES_STR))
450 .context(error::PlanSqlSnafu)?
451 .alias(COLUMN_NULLABLE_COLUMN),
452 col(key_column_usage::GREPTIME_INDEX_TYPE).alias(INDEX_INDEX_TYPE_COLUMN),
453 lit("").alias(COLUMN_COMMENT_COLUMN),
454 lit("").alias(INDEX_COMMENT_COLUMN),
455 lit(YES_STR).alias(INDEX_VISIBLE_COLUMN),
456 null().alias(INDEX_EXPRESSION_COLUMN),
457 ];
458
459 let projects = vec![
460 (key_column_usage::TABLE_NAME, INDEX_TABLE_COLUMN),
461 (INDEX_NONT_UNIQUE_COLUMN, INDEX_NONT_UNIQUE_COLUMN),
462 (key_column_usage::CONSTRAINT_NAME, INDEX_KEY_NAME_COLUMN),
463 (
464 key_column_usage::ORDINAL_POSITION,
465 INDEX_SEQ_IN_INDEX_COLUMN,
466 ),
467 (key_column_usage::COLUMN_NAME, INDEX_COLUMN_NAME_COLUMN),
468 (COLUMN_COLLATION_COLUMN, COLUMN_COLLATION_COLUMN),
469 (INDEX_CARDINALITY_COLUMN, INDEX_CARDINALITY_COLUMN),
470 (INDEX_SUB_PART_COLUMN, INDEX_SUB_PART_COLUMN),
471 (INDEX_PACKED_COLUMN, INDEX_PACKED_COLUMN),
472 (COLUMN_NULLABLE_COLUMN, COLUMN_NULLABLE_COLUMN),
473 (
474 key_column_usage::GREPTIME_INDEX_TYPE,
475 INDEX_INDEX_TYPE_COLUMN,
476 ),
477 (COLUMN_COMMENT_COLUMN, COLUMN_COMMENT_COLUMN),
478 (INDEX_COMMENT_COLUMN, INDEX_COMMENT_COLUMN),
479 (INDEX_VISIBLE_COLUMN, INDEX_VISIBLE_COLUMN),
480 (INDEX_EXPRESSION_COLUMN, INDEX_EXPRESSION_COLUMN),
481 ];
482
483 let filters = vec![
484 col(key_column_usage::TABLE_NAME).eq(lit(&stmt.table)),
485 col(key_column_usage::TABLE_SCHEMA).eq(lit(schema_name.clone())),
486 col(key_column_usage::REAL_TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
487 ];
488 let like_field = None;
489 let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
490
491 query_from_information_schema_table(
492 query_engine,
493 catalog_manager,
494 query_ctx,
495 KEY_COLUMN_USAGE,
496 select,
497 projects,
498 filters,
499 like_field,
500 sort,
501 stmt.kind,
502 )
503 .await
504}
505
506pub async fn show_region(
508 stmt: ShowRegion,
509 query_engine: &QueryEngineRef,
510 catalog_manager: &CatalogManagerRef,
511 query_ctx: QueryContextRef,
512) -> Result<Output> {
513 let schema_name = if let Some(database) = stmt.database {
514 database
515 } else {
516 query_ctx.current_schema()
517 };
518
519 let filters = vec![
520 col(region_peers::TABLE_NAME).eq(lit(&stmt.table)),
521 col(region_peers::TABLE_SCHEMA).eq(lit(schema_name.clone())),
522 col(region_peers::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
523 ];
524 let projects = vec![
525 (region_peers::TABLE_NAME, "Table"),
526 (region_peers::REGION_ID, "Region"),
527 (region_peers::PEER_ID, "Peer"),
528 (region_peers::IS_LEADER, "Leader"),
529 ];
530
531 let like_field = None;
532 let sort = vec![
533 col(columns::REGION_ID).sort(true, true),
534 col(columns::PEER_ID).sort(true, true),
535 ];
536
537 query_from_information_schema_table(
538 query_engine,
539 catalog_manager,
540 query_ctx,
541 REGION_PEERS,
542 vec![],
543 projects,
544 filters,
545 like_field,
546 sort,
547 stmt.kind,
548 )
549 .await
550}
551
552pub async fn show_tables(
554 stmt: ShowTables,
555 query_engine: &QueryEngineRef,
556 catalog_manager: &CatalogManagerRef,
557 query_ctx: QueryContextRef,
558) -> Result<Output> {
559 let schema_name = if let Some(database) = stmt.database {
560 database
561 } else {
562 query_ctx.current_schema()
563 };
564
565 let tables_column = format!("Tables_in_{}", schema_name);
567 let projects = if stmt.full {
568 vec![
569 (tables::TABLE_NAME, tables_column.as_str()),
570 (tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
571 ]
572 } else {
573 vec![(tables::TABLE_NAME, tables_column.as_str())]
574 };
575 let filters = vec![
576 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
577 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
578 ];
579 let like_field = Some(tables::TABLE_NAME);
580 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
581
582 let kind = match stmt.kind {
585 ShowKind::Where(mut filter) => {
586 replace_column_in_expr(&mut filter, "Tables", &tables_column);
587 ShowKind::Where(filter)
588 }
589 other => other,
590 };
591
592 query_from_information_schema_table(
593 query_engine,
594 catalog_manager,
595 query_ctx,
596 TABLES,
597 vec![],
598 projects,
599 filters,
600 like_field,
601 sort,
602 kind,
603 )
604 .await
605}
606
607pub async fn show_table_status(
609 stmt: ShowTableStatus,
610 query_engine: &QueryEngineRef,
611 catalog_manager: &CatalogManagerRef,
612 query_ctx: QueryContextRef,
613) -> Result<Output> {
614 let schema_name = if let Some(database) = stmt.database {
615 database
616 } else {
617 query_ctx.current_schema()
618 };
619
620 let projects = vec![
622 (tables::TABLE_NAME, "Name"),
623 (tables::ENGINE, "Engine"),
624 (tables::VERSION, "Version"),
625 (tables::ROW_FORMAT, "Row_format"),
626 (tables::TABLE_ROWS, "Rows"),
627 (tables::AVG_ROW_LENGTH, "Avg_row_length"),
628 (tables::DATA_LENGTH, "Data_length"),
629 (tables::MAX_DATA_LENGTH, "Max_data_length"),
630 (tables::INDEX_LENGTH, "Index_length"),
631 (tables::DATA_FREE, "Data_free"),
632 (tables::AUTO_INCREMENT, "Auto_increment"),
633 (tables::CREATE_TIME, "Create_time"),
634 (tables::UPDATE_TIME, "Update_time"),
635 (tables::CHECK_TIME, "Check_time"),
636 (tables::TABLE_COLLATION, "Collation"),
637 (tables::CHECKSUM, "Checksum"),
638 (tables::CREATE_OPTIONS, "Create_options"),
639 (tables::TABLE_COMMENT, "Comment"),
640 ];
641
642 let filters = vec![
643 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
644 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
645 ];
646 let like_field = Some(tables::TABLE_NAME);
647 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
648
649 query_from_information_schema_table(
650 query_engine,
651 catalog_manager,
652 query_ctx,
653 TABLES,
654 vec![],
655 projects,
656 filters,
657 like_field,
658 sort,
659 stmt.kind,
660 )
661 .await
662}
663
664pub async fn show_collations(
666 kind: ShowKind,
667 query_engine: &QueryEngineRef,
668 catalog_manager: &CatalogManagerRef,
669 query_ctx: QueryContextRef,
670) -> Result<Output> {
671 let projects = vec![
673 ("collation_name", "Collation"),
674 ("character_set_name", "Charset"),
675 ("id", "Id"),
676 ("is_default", "Default"),
677 ("is_compiled", "Compiled"),
678 ("sortlen", "Sortlen"),
679 ];
680
681 let filters = vec![];
682 let like_field = Some("collation_name");
683 let sort = vec![];
684
685 query_from_information_schema_table(
686 query_engine,
687 catalog_manager,
688 query_ctx,
689 COLLATIONS,
690 vec![],
691 projects,
692 filters,
693 like_field,
694 sort,
695 kind,
696 )
697 .await
698}
699
700pub async fn show_charsets(
702 kind: ShowKind,
703 query_engine: &QueryEngineRef,
704 catalog_manager: &CatalogManagerRef,
705 query_ctx: QueryContextRef,
706) -> Result<Output> {
707 let projects = vec![
709 ("character_set_name", "Charset"),
710 ("description", "Description"),
711 ("default_collate_name", "Default collation"),
712 ("maxlen", "Maxlen"),
713 ];
714
715 let filters = vec![];
716 let like_field = Some("character_set_name");
717 let sort = vec![];
718
719 query_from_information_schema_table(
720 query_engine,
721 catalog_manager,
722 query_ctx,
723 CHARACTER_SETS,
724 vec![],
725 projects,
726 filters,
727 like_field,
728 sort,
729 kind,
730 )
731 .await
732}
733
734pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
735 let variable = stmt.variable.to_string().to_uppercase();
736 let value = match variable.as_str() {
737 "SYSTEM_TIME_ZONE" | "SYSTEM_TIMEZONE" => get_timezone(None).to_string(),
738 "TIME_ZONE" | "TIMEZONE" => query_ctx.timezone().to_string(),
739 "READ_PREFERENCE" => query_ctx.read_preference().to_string(),
740 "DATESTYLE" => {
741 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
742 format!("{}, {}", style, order)
743 }
744 "MAX_EXECUTION_TIME" => {
745 if query_ctx.channel() == Channel::Mysql {
746 query_ctx.query_timeout_as_millis().to_string()
747 } else {
748 return UnsupportedVariableSnafu { name: variable }.fail();
749 }
750 }
751 "STATEMENT_TIMEOUT" => {
752 if query_ctx.channel() == Channel::Postgres {
754 let mut timeout = query_ctx.query_timeout_as_millis().to_string();
755 timeout.push_str("ms");
756 timeout
757 } else {
758 return UnsupportedVariableSnafu { name: variable }.fail();
759 }
760 }
761 _ => return UnsupportedVariableSnafu { name: variable }.fail(),
762 };
763 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
764 variable,
765 ConcreteDataType::string_datatype(),
766 false,
767 )]));
768 let records = RecordBatches::try_from_columns(
769 schema,
770 vec![Arc::new(StringVector::from(vec![value])) as _],
771 )
772 .context(error::CreateRecordBatchSnafu)?;
773 Ok(Output::new_with_record_batches(records))
774}
775
776pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
777 let schema = Arc::new(Schema::new(vec![
778 ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), false),
779 ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
780 ]));
781 let records = RecordBatches::try_from_columns(
782 schema,
783 vec![
784 Arc::new(StringVector::from(Vec::<&str>::new())) as _,
785 Arc::new(StringVector::from(Vec::<&str>::new())) as _,
786 ],
787 )
788 .context(error::CreateRecordBatchSnafu)?;
789 Ok(Output::new_with_record_batches(records))
790}
791
792pub async fn show_search_path(_query_ctx: QueryContextRef) -> Result<Output> {
793 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
794 "search_path",
795 ConcreteDataType::string_datatype(),
796 false,
797 )]));
798 let records = RecordBatches::try_from_columns(
799 schema,
800 vec![Arc::new(StringVector::from(vec![_query_ctx.current_schema()])) as _],
801 )
802 .context(error::CreateRecordBatchSnafu)?;
803 Ok(Output::new_with_record_batches(records))
804}
805
806pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
807 let stmt = CreateDatabase {
808 name: ObjectName::from(vec![Ident::new(database_name)]),
809 if_not_exists: true,
810 options,
811 };
812 let sql = format!("{stmt}");
813 let columns = vec![
814 Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
815 Arc::new(StringVector::from(vec![sql])) as _,
816 ];
817 let records =
818 RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
819 .context(error::CreateRecordBatchSnafu)?;
820 Ok(Output::new_with_record_batches(records))
821}
822
823pub fn show_create_table(
824 table: TableRef,
825 schema_options: Option<SchemaOptions>,
826 partitions: Option<Partitions>,
827 query_ctx: QueryContextRef,
828) -> Result<Output> {
829 let table_info = table.table_info();
830 let table_name = &table_info.name;
831
832 let quote_style = query_ctx.quote_style();
833
834 let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
835 stmt.partitions = partitions.map(|mut p| {
836 p.set_quote(quote_style);
837 p
838 });
839 let sql = format!("{}", stmt);
840 let columns = vec![
841 Arc::new(StringVector::from(vec![table_name.clone()])) as _,
842 Arc::new(StringVector::from(vec![sql])) as _,
843 ];
844 let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
845 .context(error::CreateRecordBatchSnafu)?;
846
847 Ok(Output::new_with_record_batches(records))
848}
849
850pub fn show_create_foreign_table_for_pg(
851 table: TableRef,
852 _query_ctx: QueryContextRef,
853) -> Result<Output> {
854 let table_info = table.table_info();
855
856 let table_meta = &table_info.meta;
857 let table_name = &table_info.name;
858 let schema = &table_info.meta.schema;
859 let is_metric_engine = is_metric_engine(&table_meta.engine);
860
861 let columns = schema
862 .column_schemas()
863 .iter()
864 .filter_map(|c| {
865 if is_metric_engine && is_metric_engine_internal_column(&c.name) {
866 None
867 } else {
868 Some(format!(
869 "\"{}\" {}",
870 c.name,
871 c.data_type.postgres_datatype_name()
872 ))
873 }
874 })
875 .join(",\n ");
876
877 let sql = format!(
878 r#"CREATE FOREIGN TABLE ft_{} (
879 {}
880)
881SERVER greptimedb
882OPTIONS (table_name '{}')"#,
883 table_name, columns, table_name
884 );
885
886 let columns = vec![
887 Arc::new(StringVector::from(vec![table_name.clone()])) as _,
888 Arc::new(StringVector::from(vec![sql])) as _,
889 ];
890 let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
891 .context(error::CreateRecordBatchSnafu)?;
892
893 Ok(Output::new_with_record_batches(records))
894}
895
896pub fn show_create_view(
897 view_name: ObjectName,
898 definition: &str,
899 query_ctx: QueryContextRef,
900) -> Result<Output> {
901 let mut parser_ctx =
902 ParserContext::new(query_ctx.sql_dialect(), definition).context(error::SqlSnafu)?;
903
904 let Statement::CreateView(create_view) =
905 parser_ctx.parse_statement().context(error::SqlSnafu)?
906 else {
907 unreachable!();
909 };
910
911 let stmt = CreateView {
912 name: view_name.clone(),
913 columns: create_view.columns,
914 query: create_view.query,
915 or_replace: create_view.or_replace,
916 if_not_exists: create_view.if_not_exists,
917 };
918
919 let sql = format!("{}", stmt);
920 let columns = vec![
921 Arc::new(StringVector::from(vec![view_name.to_string()])) as _,
922 Arc::new(StringVector::from(vec![sql])) as _,
923 ];
924 let records = RecordBatches::try_from_columns(SHOW_CREATE_VIEW_OUTPUT_SCHEMA.clone(), columns)
925 .context(error::CreateRecordBatchSnafu)?;
926
927 Ok(Output::new_with_record_batches(records))
928}
929
930pub async fn show_views(
932 stmt: ShowViews,
933 query_engine: &QueryEngineRef,
934 catalog_manager: &CatalogManagerRef,
935 query_ctx: QueryContextRef,
936) -> Result<Output> {
937 let schema_name = if let Some(database) = stmt.database {
938 database
939 } else {
940 query_ctx.current_schema()
941 };
942
943 let projects = vec![(tables::TABLE_NAME, VIEWS_COLUMN)];
944 let filters = vec![
945 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
946 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
947 ];
948 let like_field = Some(tables::TABLE_NAME);
949 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
950
951 query_from_information_schema_table(
952 query_engine,
953 catalog_manager,
954 query_ctx,
955 VIEWS,
956 vec![],
957 projects,
958 filters,
959 like_field,
960 sort,
961 stmt.kind,
962 )
963 .await
964}
965
966pub async fn show_flows(
968 stmt: ShowFlows,
969 query_engine: &QueryEngineRef,
970 catalog_manager: &CatalogManagerRef,
971 query_ctx: QueryContextRef,
972) -> Result<Output> {
973 let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
974 let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
975 let like_field = Some(flows::FLOW_NAME);
976 let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
977
978 query_from_information_schema_table(
979 query_engine,
980 catalog_manager,
981 query_ctx,
982 FLOWS,
983 vec![],
984 projects,
985 filters,
986 like_field,
987 sort,
988 stmt.kind,
989 )
990 .await
991}
992
993#[cfg(feature = "enterprise")]
994pub async fn show_triggers(
995 stmt: sql::statements::show::trigger::ShowTriggers,
996 query_engine: &QueryEngineRef,
997 catalog_manager: &CatalogManagerRef,
998 query_ctx: QueryContextRef,
999) -> Result<Output> {
1000 const TRIGGER_NAME: &str = "trigger_name";
1001 const TRIGGERS_COLUMN: &str = "Triggers";
1002
1003 let projects = vec![(TRIGGER_NAME, TRIGGERS_COLUMN)];
1004 let like_field = Some(TRIGGER_NAME);
1005 let sort = vec![col(TRIGGER_NAME).sort(true, true)];
1006
1007 query_from_information_schema_table(
1008 query_engine,
1009 catalog_manager,
1010 query_ctx,
1011 catalog::information_schema::TRIGGERS,
1012 vec![],
1013 projects,
1014 vec![],
1015 like_field,
1016 sort,
1017 stmt.kind,
1018 )
1019 .await
1020}
1021
1022pub fn show_create_flow(
1023 flow_name: ObjectName,
1024 flow_val: FlowInfoValue,
1025 query_ctx: QueryContextRef,
1026) -> Result<Output> {
1027 let mut parser_ctx =
1028 ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
1029
1030 let query = parser_ctx.parse_statement().context(error::SqlSnafu)?;
1031
1032 let raw_query = match &query {
1034 Statement::Tql(_) => flow_val.raw_sql().clone(),
1035 _ => query.to_string(),
1036 };
1037
1038 let query = Box::new(SqlOrTql::try_from_statement(query, &raw_query).context(error::SqlSnafu)?);
1039
1040 let comment = if flow_val.comment().is_empty() {
1041 None
1042 } else {
1043 Some(flow_val.comment().clone())
1044 };
1045
1046 let stmt = CreateFlow {
1047 flow_name,
1048 sink_table_name: ObjectName::from(vec![Ident::new(&flow_val.sink_table_name().table_name)]),
1049 or_replace: false,
1052 if_not_exists: true,
1053 expire_after: flow_val.expire_after(),
1054 eval_interval: flow_val.eval_interval(),
1055 comment,
1056 query,
1057 };
1058
1059 let sql = format!("{}", stmt);
1060 let columns = vec![
1061 Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
1062 Arc::new(StringVector::from(vec![sql])) as _,
1063 ];
1064 let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
1065 .context(error::CreateRecordBatchSnafu)?;
1066
1067 Ok(Output::new_with_record_batches(records))
1068}
1069
1070pub fn describe_table(table: TableRef) -> Result<Output> {
1071 let table_info = table.table_info();
1072 let columns_schemas = table_info.meta.schema.column_schemas();
1073 let columns = vec![
1074 describe_column_names(columns_schemas),
1075 describe_column_types(columns_schemas),
1076 describe_column_keys(columns_schemas, &table_info.meta.primary_key_indices),
1077 describe_column_nullables(columns_schemas),
1078 describe_column_defaults(columns_schemas),
1079 describe_column_semantic_types(columns_schemas, &table_info.meta.primary_key_indices),
1080 ];
1081 let records = RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), columns)
1082 .context(error::CreateRecordBatchSnafu)?;
1083 Ok(Output::new_with_record_batches(records))
1084}
1085
1086fn describe_column_names(columns_schemas: &[ColumnSchema]) -> VectorRef {
1087 Arc::new(StringVector::from_iterator(
1088 columns_schemas.iter().map(|cs| cs.name.as_str()),
1089 ))
1090}
1091
1092fn describe_column_types(columns_schemas: &[ColumnSchema]) -> VectorRef {
1093 Arc::new(StringVector::from(
1094 columns_schemas
1095 .iter()
1096 .map(|cs| cs.data_type.name())
1097 .collect::<Vec<_>>(),
1098 ))
1099}
1100
1101fn describe_column_keys(
1102 columns_schemas: &[ColumnSchema],
1103 primary_key_indices: &[usize],
1104) -> VectorRef {
1105 Arc::new(StringVector::from_iterator(
1106 columns_schemas.iter().enumerate().map(|(i, cs)| {
1107 if cs.is_time_index() || primary_key_indices.contains(&i) {
1108 PRI_KEY
1109 } else {
1110 ""
1111 }
1112 }),
1113 ))
1114}
1115
1116fn describe_column_nullables(columns_schemas: &[ColumnSchema]) -> VectorRef {
1117 Arc::new(StringVector::from_iterator(columns_schemas.iter().map(
1118 |cs| {
1119 if cs.is_nullable() { YES_STR } else { NO_STR }
1120 },
1121 )))
1122}
1123
1124fn describe_column_defaults(columns_schemas: &[ColumnSchema]) -> VectorRef {
1125 Arc::new(StringVector::from(
1126 columns_schemas
1127 .iter()
1128 .map(|cs| {
1129 cs.default_constraint()
1130 .map_or(String::from(""), |dc| dc.to_string())
1131 })
1132 .collect::<Vec<String>>(),
1133 ))
1134}
1135
1136fn describe_column_semantic_types(
1137 columns_schemas: &[ColumnSchema],
1138 primary_key_indices: &[usize],
1139) -> VectorRef {
1140 Arc::new(StringVector::from_iterator(
1141 columns_schemas.iter().enumerate().map(|(i, cs)| {
1142 if primary_key_indices.contains(&i) {
1143 SEMANTIC_TYPE_PRIMARY_KEY
1144 } else if cs.is_time_index() {
1145 SEMANTIC_TYPE_TIME_INDEX
1146 } else {
1147 SEMANTIC_TYPE_FIELD
1148 }
1149 }),
1150 ))
1151}
1152
1153pub async fn prepare_file_table_files(
1155 options: &HashMap<String, String>,
1156) -> Result<(ObjectStore, Vec<String>)> {
1157 let url = options
1158 .get(FILE_TABLE_LOCATION_KEY)
1159 .context(error::MissingRequiredFieldSnafu {
1160 name: FILE_TABLE_LOCATION_KEY,
1161 })?;
1162
1163 let (dir, filename) = find_dir_and_filename(url);
1164 let source = if let Some(filename) = filename {
1165 Source::Filename(filename)
1166 } else {
1167 Source::Dir
1168 };
1169 let regex = options
1170 .get(FILE_TABLE_PATTERN_KEY)
1171 .map(|x| Regex::new(x))
1172 .transpose()
1173 .context(error::BuildRegexSnafu)?;
1174 let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
1175 let lister = Lister::new(object_store.clone(), source, dir, regex);
1176 let files = lister
1181 .list()
1182 .await
1183 .context(error::ListObjectsSnafu)?
1184 .into_iter()
1185 .filter_map(|entry| {
1186 if entry.path().ends_with('/') {
1187 None
1188 } else {
1189 Some(entry.path().to_string())
1190 }
1191 })
1192 .collect::<Vec<_>>();
1193 Ok((object_store, files))
1194}
1195
1196pub async fn infer_file_table_schema(
1197 object_store: &ObjectStore,
1198 files: &[String],
1199 options: &HashMap<String, String>,
1200) -> Result<RawSchema> {
1201 let format = parse_file_table_format(options)?;
1202 let merged = infer_schemas(object_store, files, format.as_ref())
1203 .await
1204 .context(error::InferSchemaSnafu)?;
1205 Ok(RawSchema::from(
1206 &Schema::try_from(merged).context(error::ConvertSchemaSnafu)?,
1207 ))
1208}
1209
1210pub fn file_column_schemas_to_table(
1220 file_column_schemas: &[ColumnSchema],
1221) -> (Vec<ColumnSchema>, String) {
1222 let mut column_schemas = file_column_schemas.to_owned();
1223 if let Some(time_index_column) = column_schemas.iter().find(|c| c.is_time_index()) {
1224 let time_index = time_index_column.name.clone();
1225 return (column_schemas, time_index);
1226 }
1227
1228 let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
1229 let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
1230 let timestamp_column_schema = ColumnSchema::new(greptime_timestamp(), timestamp_type, false)
1231 .with_time_index(true)
1232 .with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
1233 .unwrap();
1234
1235 if let Some(column_schema) = column_schemas
1236 .iter_mut()
1237 .find(|column_schema| column_schema.name == greptime_timestamp())
1238 {
1239 *column_schema = timestamp_column_schema;
1241 } else {
1242 column_schemas.push(timestamp_column_schema);
1243 }
1244
1245 (column_schemas, greptime_timestamp().to_string())
1246}
1247
1248pub fn check_file_to_table_schema_compatibility(
1257 file_column_schemas: &[ColumnSchema],
1258 table_column_schemas: &[ColumnSchema],
1259) -> Result<()> {
1260 let file_schemas_map = file_column_schemas
1261 .iter()
1262 .map(|s| (s.name.clone(), s))
1263 .collect::<HashMap<_, _>>();
1264
1265 for table_column in table_column_schemas {
1266 if let Some(file_column) = file_schemas_map.get(&table_column.name) {
1267 ensure!(
1269 file_column
1270 .data_type
1271 .can_arrow_type_cast_to(&table_column.data_type),
1272 error::ColumnSchemaIncompatibleSnafu {
1273 column: table_column.name.clone(),
1274 file_type: file_column.data_type.clone(),
1275 table_type: table_column.data_type.clone(),
1276 }
1277 );
1278 } else {
1279 ensure!(
1280 table_column.is_nullable() || table_column.default_constraint().is_some(),
1281 error::ColumnSchemaNoDefaultSnafu {
1282 column: table_column.name.clone(),
1283 }
1284 );
1285 }
1286 }
1287
1288 Ok(())
1289}
1290
1291fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn FileFormat>> {
1292 Ok(
1293 match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
1294 Format::Csv(format) => Box::new(format),
1295 Format::Json(format) => Box::new(format),
1296 Format::Parquet(format) => Box::new(format),
1297 Format::Orc(format) => Box::new(format),
1298 },
1299 )
1300}
1301
1302pub async fn show_processlist(
1303 stmt: ShowProcessList,
1304 query_engine: &QueryEngineRef,
1305 catalog_manager: &CatalogManagerRef,
1306 query_ctx: QueryContextRef,
1307) -> Result<Output> {
1308 let projects = if stmt.full {
1309 vec![
1310 (process_list::ID, "Id"),
1311 (process_list::CATALOG, "Catalog"),
1312 (process_list::SCHEMAS, "Schema"),
1313 (process_list::CLIENT, "Client"),
1314 (process_list::FRONTEND, "Frontend"),
1315 (process_list::START_TIMESTAMP, "Start Time"),
1316 (process_list::ELAPSED_TIME, "Elapsed Time"),
1317 (process_list::QUERY, "Query"),
1318 ]
1319 } else {
1320 vec![
1321 (process_list::ID, "Id"),
1322 (process_list::CATALOG, "Catalog"),
1323 (process_list::QUERY, "Query"),
1324 (process_list::ELAPSED_TIME, "Elapsed Time"),
1325 ]
1326 };
1327
1328 let filters = vec![];
1329 let like_field = None;
1330 let sort = vec![col("id").sort(true, true)];
1331 query_from_information_schema_table(
1332 query_engine,
1333 catalog_manager,
1334 query_ctx.clone(),
1335 "process_list",
1336 vec![],
1337 projects.clone(),
1338 filters,
1339 like_field,
1340 sort,
1341 ShowKind::All,
1342 )
1343 .await
1344}
1345
1346#[cfg(test)]
1347mod test {
1348 use std::sync::Arc;
1349
1350 use common_query::{Output, OutputData};
1351 use common_recordbatch::{RecordBatch, RecordBatches};
1352 use common_time::Timezone;
1353 use common_time::timestamp::TimeUnit;
1354 use datatypes::prelude::ConcreteDataType;
1355 use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
1356 use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
1357 use session::context::QueryContextBuilder;
1358 use snafu::ResultExt;
1359 use sql::ast::{Ident, ObjectName};
1360 use sql::statements::show::ShowVariables;
1361 use table::TableRef;
1362 use table::test_util::MemTable;
1363
1364 use super::show_variable;
1365 use crate::error;
1366 use crate::error::Result;
1367 use crate::sql::{
1368 DESCRIBE_TABLE_OUTPUT_SCHEMA, NO_STR, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_TIME_INDEX,
1369 YES_STR, describe_table,
1370 };
1371
1372 #[test]
1373 fn test_describe_table_multiple_columns() -> Result<()> {
1374 let table_name = "test_table";
1375 let schema = vec![
1376 ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
1377 ColumnSchema::new(
1378 "t2",
1379 ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
1380 false,
1381 )
1382 .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
1383 "current_timestamp()",
1384 ))))
1385 .unwrap()
1386 .with_time_index(true),
1387 ];
1388 let data = vec![
1389 Arc::new(UInt32Vector::from_slice([0])) as _,
1390 Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
1391 ];
1392 let expected_columns = vec![
1393 Arc::new(StringVector::from(vec!["t1", "t2"])) as _,
1394 Arc::new(StringVector::from(vec!["UInt32", "TimestampMillisecond"])) as _,
1395 Arc::new(StringVector::from(vec!["", "PRI"])) as _,
1396 Arc::new(StringVector::from(vec![YES_STR, NO_STR])) as _,
1397 Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
1398 Arc::new(StringVector::from(vec![
1399 SEMANTIC_TYPE_FIELD,
1400 SEMANTIC_TYPE_TIME_INDEX,
1401 ])) as _,
1402 ];
1403
1404 describe_table_test_by_schema(table_name, schema, data, expected_columns)
1405 }
1406
1407 fn describe_table_test_by_schema(
1408 table_name: &str,
1409 schema: Vec<ColumnSchema>,
1410 data: Vec<VectorRef>,
1411 expected_columns: Vec<VectorRef>,
1412 ) -> Result<()> {
1413 let table_schema = SchemaRef::new(Schema::new(schema));
1414 let table = prepare_describe_table(table_name, table_schema, data);
1415
1416 let expected =
1417 RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
1418 .context(error::CreateRecordBatchSnafu)?;
1419
1420 if let OutputData::RecordBatches(res) = describe_table(table)?.data {
1421 assert_eq!(res.take(), expected.take());
1422 } else {
1423 panic!("describe table must return record batch");
1424 }
1425
1426 Ok(())
1427 }
1428
1429 fn prepare_describe_table(
1430 table_name: &str,
1431 table_schema: SchemaRef,
1432 data: Vec<VectorRef>,
1433 ) -> TableRef {
1434 let record_batch = RecordBatch::new(table_schema, data).unwrap();
1435 MemTable::table(table_name, record_batch)
1436 }
1437
1438 #[test]
1439 fn test_show_variable() {
1440 assert_eq!(
1441 exec_show_variable("SYSTEM_TIME_ZONE", "Asia/Shanghai").unwrap(),
1442 "UTC"
1443 );
1444 assert_eq!(
1445 exec_show_variable("SYSTEM_TIMEZONE", "Asia/Shanghai").unwrap(),
1446 "UTC"
1447 );
1448 assert_eq!(
1449 exec_show_variable("TIME_ZONE", "Asia/Shanghai").unwrap(),
1450 "Asia/Shanghai"
1451 );
1452 assert_eq!(
1453 exec_show_variable("TIMEZONE", "Asia/Shanghai").unwrap(),
1454 "Asia/Shanghai"
1455 );
1456 assert!(exec_show_variable("TIME ZONE", "Asia/Shanghai").is_err());
1457 assert!(exec_show_variable("SYSTEM TIME ZONE", "Asia/Shanghai").is_err());
1458 }
1459
1460 fn exec_show_variable(variable: &str, tz: &str) -> Result<String> {
1461 let stmt = ShowVariables {
1462 variable: ObjectName::from(vec![Ident::new(variable)]),
1463 };
1464 let ctx = Arc::new(
1465 QueryContextBuilder::default()
1466 .timezone(Timezone::from_tz_string(tz).unwrap())
1467 .build(),
1468 );
1469 match show_variable(stmt, ctx) {
1470 Ok(Output {
1471 data: OutputData::RecordBatches(record),
1472 ..
1473 }) => {
1474 let record = record.take().first().cloned().unwrap();
1475 Ok(record.iter_column_as_string(0).next().unwrap().unwrap())
1476 }
1477 Ok(_) => unreachable!(),
1478 Err(e) => Err(e),
1479 }
1480 }
1481}