1mod show_create_table;
16
17use std::collections::HashMap;
18use std::ops::ControlFlow;
19use std::sync::Arc;
20
21use catalog::CatalogManagerRef;
22use catalog::information_schema::{
23 CHARACTER_SETS, COLLATIONS, COLUMNS, FLOWS, KEY_COLUMN_USAGE, REGION_PEERS, SCHEMATA, TABLES,
24 VIEWS, columns, flows, key_column_usage, process_list, region_peers, schemata, tables,
25};
26use common_catalog::consts::{
27 INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
28 SEMANTIC_TYPE_TIME_INDEX,
29};
30use common_catalog::format_full_table_name;
31use common_datasource::file_format::{FileFormat, Format, infer_schemas};
32use common_datasource::lister::{Lister, Source};
33use common_datasource::object_store::build_backend;
34use common_datasource::util::find_dir_and_filename;
35use common_meta::SchemaOptions;
36use common_meta::key::flow::flow_info::FlowInfoValue;
37use common_query::Output;
38use common_query::prelude::greptime_timestamp;
39use common_recordbatch::RecordBatches;
40use common_recordbatch::adapter::RecordBatchStreamAdapter;
41use common_time::Timestamp;
42use common_time::timezone::get_timezone;
43use datafusion::common::ScalarValue;
44use datafusion::prelude::SessionContext;
45use datafusion_expr::{Expr, SortExpr, case, col, lit};
46use datatypes::prelude::*;
47use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
48use datatypes::vectors::StringVector;
49use itertools::Itertools;
50use object_store::ObjectStore;
51use once_cell::sync::Lazy;
52use regex::Regex;
53use session::context::{Channel, QueryContextRef};
54pub use show_create_table::create_table_stmt;
55use snafu::{OptionExt, ResultExt, ensure};
56use sql::ast::{Ident, visit_expressions_mut};
57use sql::parser::ParserContext;
58use sql::statements::OptionMap;
59use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions, SqlOrTql};
60use sql::statements::show::{
61 ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowProcessList, ShowRegion,
62 ShowTableStatus, ShowTables, ShowVariables, ShowViews,
63};
64use sql::statements::statement::Statement;
65use sqlparser::ast::ObjectName;
66use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
67use table::TableRef;
68use table::metadata::TableInfoRef;
69use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
70
71use crate::QueryEngineRef;
72use crate::error::{self, Result, UnsupportedVariableSnafu};
73use crate::planner::DfLogicalPlanner;
74
75const SCHEMAS_COLUMN: &str = "Database";
76const OPTIONS_COLUMN: &str = "Options";
77const VIEWS_COLUMN: &str = "Views";
78const FLOWS_COLUMN: &str = "Flows";
79const FIELD_COLUMN: &str = "Field";
80const TABLE_TYPE_COLUMN: &str = "Table_type";
81const COLUMN_NAME_COLUMN: &str = "Column";
82const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
83const COLUMN_TYPE_COLUMN: &str = "Type";
84const COLUMN_KEY_COLUMN: &str = "Key";
85const COLUMN_EXTRA_COLUMN: &str = "Extra";
86const COLUMN_PRIVILEGES_COLUMN: &str = "Privileges";
87const COLUMN_COLLATION_COLUMN: &str = "Collation";
88const COLUMN_NULLABLE_COLUMN: &str = "Null";
89const COLUMN_DEFAULT_COLUMN: &str = "Default";
90const COLUMN_COMMENT_COLUMN: &str = "Comment";
91const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
92
93const YES_STR: &str = "YES";
94const NO_STR: &str = "NO";
95const PRI_KEY: &str = "PRI";
96const TIME_INDEX: &str = "TIME INDEX";
97
98const INDEX_TABLE_COLUMN: &str = "Table";
100const INDEX_NONT_UNIQUE_COLUMN: &str = "Non_unique";
101const INDEX_CARDINALITY_COLUMN: &str = "Cardinality";
102const INDEX_SUB_PART_COLUMN: &str = "Sub_part";
103const INDEX_PACKED_COLUMN: &str = "Packed";
104const INDEX_INDEX_TYPE_COLUMN: &str = "Index_type";
105const INDEX_COMMENT_COLUMN: &str = "Index_comment";
106const INDEX_VISIBLE_COLUMN: &str = "Visible";
107const INDEX_EXPRESSION_COLUMN: &str = "Expression";
108const INDEX_KEY_NAME_COLUMN: &str = "Key_name";
109const INDEX_SEQ_IN_INDEX_COLUMN: &str = "Seq_in_index";
110const INDEX_COLUMN_NAME_COLUMN: &str = "Column_name";
111
112static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
113 Arc::new(Schema::new(vec![
114 ColumnSchema::new(
115 COLUMN_NAME_COLUMN,
116 ConcreteDataType::string_datatype(),
117 false,
118 ),
119 ColumnSchema::new(
120 COLUMN_TYPE_COLUMN,
121 ConcreteDataType::string_datatype(),
122 false,
123 ),
124 ColumnSchema::new(COLUMN_KEY_COLUMN, ConcreteDataType::string_datatype(), true),
125 ColumnSchema::new(
126 COLUMN_NULLABLE_COLUMN,
127 ConcreteDataType::string_datatype(),
128 false,
129 ),
130 ColumnSchema::new(
131 COLUMN_DEFAULT_COLUMN,
132 ConcreteDataType::string_datatype(),
133 false,
134 ),
135 ColumnSchema::new(
136 COLUMN_SEMANTIC_TYPE_COLUMN,
137 ConcreteDataType::string_datatype(),
138 false,
139 ),
140 ]))
141});
142
143static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
144 Arc::new(Schema::new(vec![
145 ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
146 ColumnSchema::new(
147 "Create Database",
148 ConcreteDataType::string_datatype(),
149 false,
150 ),
151 ]))
152});
153
154static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
155 Arc::new(Schema::new(vec![
156 ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
157 ColumnSchema::new("Create Table", ConcreteDataType::string_datatype(), false),
158 ]))
159});
160
161static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
162 Arc::new(Schema::new(vec![
163 ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
164 ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
165 ]))
166});
167
168static SHOW_CREATE_VIEW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
169 Arc::new(Schema::new(vec![
170 ColumnSchema::new("View", ConcreteDataType::string_datatype(), false),
171 ColumnSchema::new("Create View", ConcreteDataType::string_datatype(), false),
172 ]))
173});
174
175fn null() -> Expr {
176 lit(ScalarValue::Null)
177}
178
179pub async fn show_databases(
180 stmt: ShowDatabases,
181 query_engine: &QueryEngineRef,
182 catalog_manager: &CatalogManagerRef,
183 query_ctx: QueryContextRef,
184) -> Result<Output> {
185 let projects = if stmt.full {
186 vec![
187 (schemata::SCHEMA_NAME, SCHEMAS_COLUMN),
188 (schemata::SCHEMA_OPTS, OPTIONS_COLUMN),
189 ]
190 } else {
191 vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)]
192 };
193
194 let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
195 let like_field = Some(schemata::SCHEMA_NAME);
196 let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
197
198 query_from_information_schema_table(
199 query_engine,
200 catalog_manager,
201 query_ctx,
202 SCHEMATA,
203 vec![],
204 projects,
205 filters,
206 like_field,
207 sort,
208 stmt.kind,
209 )
210 .await
211}
212
213fn replace_column_in_expr(expr: &mut sqlparser::ast::Expr, from_column: &str, to_column: &str) {
216 let _ = visit_expressions_mut(expr, |e| {
217 match e {
218 sqlparser::ast::Expr::Identifier(ident) => {
219 if ident.value.eq_ignore_ascii_case(from_column) {
220 ident.value = to_column.to_string();
221 }
222 }
223 sqlparser::ast::Expr::CompoundIdentifier(idents) => {
224 if let Some(last) = idents.last_mut()
225 && last.value.eq_ignore_ascii_case(from_column)
226 {
227 last.value = to_column.to_string();
228 }
229 }
230 _ => {}
231 }
232 ControlFlow::<()>::Continue(())
233 });
234}
235
236#[allow(clippy::too_many_arguments)]
244async fn query_from_information_schema_table(
245 query_engine: &QueryEngineRef,
246 catalog_manager: &CatalogManagerRef,
247 query_ctx: QueryContextRef,
248 table_name: &str,
249 select: Vec<Expr>,
250 projects: Vec<(&str, &str)>,
251 filters: Vec<Expr>,
252 like_field: Option<&str>,
253 sort: Vec<SortExpr>,
254 kind: ShowKind,
255) -> Result<Output> {
256 let table = catalog_manager
257 .table(
258 query_ctx.current_catalog(),
259 INFORMATION_SCHEMA_NAME,
260 table_name,
261 Some(&query_ctx),
262 )
263 .await
264 .context(error::CatalogSnafu)?
265 .with_context(|| error::TableNotFoundSnafu {
266 table: format_full_table_name(
267 query_ctx.current_catalog(),
268 INFORMATION_SCHEMA_NAME,
269 table_name,
270 ),
271 })?;
272
273 let dataframe = query_engine.read_table(table)?;
274
275 let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
277 df.filter(expr).context(error::PlanSqlSnafu)
278 })?;
279
280 let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
282 dataframe
283 .filter(col(field).like(lit(ident.value.clone())))
284 .context(error::PlanSqlSnafu)?
285 } else {
286 dataframe
287 };
288
289 let dataframe = if sort.is_empty() {
291 dataframe
292 } else {
293 dataframe.sort(sort).context(error::PlanSqlSnafu)?
294 };
295
296 let dataframe = if select.is_empty() {
298 if projects.is_empty() {
299 dataframe
300 } else {
301 let projection = projects
302 .iter()
303 .map(|x| col(x.0).alias(x.1))
304 .collect::<Vec<_>>();
305 dataframe.select(projection).context(error::PlanSqlSnafu)?
306 }
307 } else {
308 dataframe.select(select).context(error::PlanSqlSnafu)?
309 };
310
311 let dataframe = projects
313 .into_iter()
314 .try_fold(dataframe, |df, (column, renamed_column)| {
315 df.with_column_renamed(column, renamed_column)
316 .context(error::PlanSqlSnafu)
317 })?;
318
319 let dataframe = match kind {
320 ShowKind::All | ShowKind::Like(_) => {
321 dataframe
323 }
324 ShowKind::Where(filter) => {
325 let view = dataframe.into_view();
328 let dataframe = SessionContext::new_with_state(
329 query_engine
330 .engine_context(query_ctx.clone())
331 .state()
332 .clone(),
333 )
334 .read_table(view)?;
335
336 let planner = query_engine.planner();
337 let planner = planner
338 .as_any()
339 .downcast_ref::<DfLogicalPlanner>()
340 .expect("Must be the datafusion planner");
341
342 let filter = planner
343 .sql_to_expr(filter, dataframe.schema(), false, query_ctx)
344 .await?;
345
346 dataframe.filter(filter).context(error::PlanSqlSnafu)?
348 }
349 };
350
351 let stream = dataframe.execute_stream().await?;
352
353 Ok(Output::new_with_stream(Box::pin(
354 RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
355 )))
356}
357
358pub async fn show_columns(
360 stmt: ShowColumns,
361 query_engine: &QueryEngineRef,
362 catalog_manager: &CatalogManagerRef,
363 query_ctx: QueryContextRef,
364) -> Result<Output> {
365 let schema_name = if let Some(database) = stmt.database {
366 database
367 } else {
368 query_ctx.current_schema()
369 };
370
371 let projects = if stmt.full {
372 vec![
373 (columns::COLUMN_NAME, FIELD_COLUMN),
374 (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
375 (columns::COLLATION_NAME, COLUMN_COLLATION_COLUMN),
376 (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
377 (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
378 (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
379 (columns::COLUMN_COMMENT, COLUMN_COMMENT_COLUMN),
380 (columns::PRIVILEGES, COLUMN_PRIVILEGES_COLUMN),
381 (columns::EXTRA, COLUMN_EXTRA_COLUMN),
382 (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
383 ]
384 } else {
385 vec![
386 (columns::COLUMN_NAME, FIELD_COLUMN),
387 (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
388 (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
389 (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
390 (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
391 (columns::EXTRA, COLUMN_EXTRA_COLUMN),
392 (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
393 ]
394 };
395
396 let filters = vec![
397 col(columns::TABLE_NAME).eq(lit(&stmt.table)),
398 col(columns::TABLE_SCHEMA).eq(lit(schema_name.clone())),
399 col(columns::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
400 ];
401 let like_field = Some(columns::COLUMN_NAME);
402 let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
403
404 query_from_information_schema_table(
405 query_engine,
406 catalog_manager,
407 query_ctx,
408 COLUMNS,
409 vec![],
410 projects,
411 filters,
412 like_field,
413 sort,
414 stmt.kind,
415 )
416 .await
417}
418
419pub async fn show_index(
421 stmt: ShowIndex,
422 query_engine: &QueryEngineRef,
423 catalog_manager: &CatalogManagerRef,
424 query_ctx: QueryContextRef,
425) -> Result<Output> {
426 let schema_name = if let Some(database) = stmt.database {
427 database
428 } else {
429 query_ctx.current_schema()
430 };
431
432 let select = vec![
433 col(key_column_usage::TABLE_NAME).alias(INDEX_TABLE_COLUMN),
434 lit(1).alias(INDEX_NONT_UNIQUE_COLUMN),
436 col(key_column_usage::CONSTRAINT_NAME).alias(INDEX_KEY_NAME_COLUMN),
437 col(key_column_usage::ORDINAL_POSITION).alias(INDEX_SEQ_IN_INDEX_COLUMN),
438 col(key_column_usage::COLUMN_NAME).alias(INDEX_COLUMN_NAME_COLUMN),
439 lit("A").alias(COLUMN_COLLATION_COLUMN),
441 null().alias(INDEX_CARDINALITY_COLUMN),
442 null().alias(INDEX_SUB_PART_COLUMN),
443 null().alias(INDEX_PACKED_COLUMN),
444 case(col(key_column_usage::CONSTRAINT_NAME))
449 .when(lit(TIME_INDEX), lit(NO_STR))
450 .otherwise(lit(YES_STR))
451 .context(error::PlanSqlSnafu)?
452 .alias(COLUMN_NULLABLE_COLUMN),
453 col(key_column_usage::GREPTIME_INDEX_TYPE).alias(INDEX_INDEX_TYPE_COLUMN),
454 lit("").alias(COLUMN_COMMENT_COLUMN),
455 lit("").alias(INDEX_COMMENT_COLUMN),
456 lit(YES_STR).alias(INDEX_VISIBLE_COLUMN),
457 null().alias(INDEX_EXPRESSION_COLUMN),
458 ];
459
460 let projects = vec![
461 (key_column_usage::TABLE_NAME, INDEX_TABLE_COLUMN),
462 (INDEX_NONT_UNIQUE_COLUMN, INDEX_NONT_UNIQUE_COLUMN),
463 (key_column_usage::CONSTRAINT_NAME, INDEX_KEY_NAME_COLUMN),
464 (
465 key_column_usage::ORDINAL_POSITION,
466 INDEX_SEQ_IN_INDEX_COLUMN,
467 ),
468 (key_column_usage::COLUMN_NAME, INDEX_COLUMN_NAME_COLUMN),
469 (COLUMN_COLLATION_COLUMN, COLUMN_COLLATION_COLUMN),
470 (INDEX_CARDINALITY_COLUMN, INDEX_CARDINALITY_COLUMN),
471 (INDEX_SUB_PART_COLUMN, INDEX_SUB_PART_COLUMN),
472 (INDEX_PACKED_COLUMN, INDEX_PACKED_COLUMN),
473 (COLUMN_NULLABLE_COLUMN, COLUMN_NULLABLE_COLUMN),
474 (
475 key_column_usage::GREPTIME_INDEX_TYPE,
476 INDEX_INDEX_TYPE_COLUMN,
477 ),
478 (COLUMN_COMMENT_COLUMN, COLUMN_COMMENT_COLUMN),
479 (INDEX_COMMENT_COLUMN, INDEX_COMMENT_COLUMN),
480 (INDEX_VISIBLE_COLUMN, INDEX_VISIBLE_COLUMN),
481 (INDEX_EXPRESSION_COLUMN, INDEX_EXPRESSION_COLUMN),
482 ];
483
484 let filters = vec![
485 col(key_column_usage::TABLE_NAME).eq(lit(&stmt.table)),
486 col(key_column_usage::TABLE_SCHEMA).eq(lit(schema_name.clone())),
487 col(key_column_usage::REAL_TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
488 ];
489 let like_field = None;
490 let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
491
492 query_from_information_schema_table(
493 query_engine,
494 catalog_manager,
495 query_ctx,
496 KEY_COLUMN_USAGE,
497 select,
498 projects,
499 filters,
500 like_field,
501 sort,
502 stmt.kind,
503 )
504 .await
505}
506
507pub async fn show_region(
509 stmt: ShowRegion,
510 query_engine: &QueryEngineRef,
511 catalog_manager: &CatalogManagerRef,
512 query_ctx: QueryContextRef,
513) -> Result<Output> {
514 let schema_name = if let Some(database) = stmt.database {
515 database
516 } else {
517 query_ctx.current_schema()
518 };
519
520 let filters = vec![
521 col(region_peers::TABLE_NAME).eq(lit(&stmt.table)),
522 col(region_peers::TABLE_SCHEMA).eq(lit(schema_name.clone())),
523 col(region_peers::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
524 ];
525 let projects = vec![
526 (region_peers::TABLE_NAME, "Table"),
527 (region_peers::REGION_ID, "Region"),
528 (region_peers::PEER_ID, "Peer"),
529 (region_peers::IS_LEADER, "Leader"),
530 ];
531
532 let like_field = None;
533 let sort = vec![
534 col(columns::REGION_ID).sort(true, true),
535 col(columns::PEER_ID).sort(true, true),
536 ];
537
538 query_from_information_schema_table(
539 query_engine,
540 catalog_manager,
541 query_ctx,
542 REGION_PEERS,
543 vec![],
544 projects,
545 filters,
546 like_field,
547 sort,
548 stmt.kind,
549 )
550 .await
551}
552
553pub async fn show_tables(
555 stmt: ShowTables,
556 query_engine: &QueryEngineRef,
557 catalog_manager: &CatalogManagerRef,
558 query_ctx: QueryContextRef,
559) -> Result<Output> {
560 let schema_name = if let Some(database) = stmt.database {
561 database
562 } else {
563 query_ctx.current_schema()
564 };
565
566 let tables_column = format!("Tables_in_{}", schema_name);
568 let projects = if stmt.full {
569 vec![
570 (tables::TABLE_NAME, tables_column.as_str()),
571 (tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
572 ]
573 } else {
574 vec![(tables::TABLE_NAME, tables_column.as_str())]
575 };
576 let filters = vec![
577 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
578 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
579 ];
580 let like_field = Some(tables::TABLE_NAME);
581 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
582
583 let kind = match stmt.kind {
586 ShowKind::Where(mut filter) => {
587 replace_column_in_expr(&mut filter, "Tables", &tables_column);
588 ShowKind::Where(filter)
589 }
590 other => other,
591 };
592
593 query_from_information_schema_table(
594 query_engine,
595 catalog_manager,
596 query_ctx,
597 TABLES,
598 vec![],
599 projects,
600 filters,
601 like_field,
602 sort,
603 kind,
604 )
605 .await
606}
607
608pub async fn show_table_status(
610 stmt: ShowTableStatus,
611 query_engine: &QueryEngineRef,
612 catalog_manager: &CatalogManagerRef,
613 query_ctx: QueryContextRef,
614) -> Result<Output> {
615 let schema_name = if let Some(database) = stmt.database {
616 database
617 } else {
618 query_ctx.current_schema()
619 };
620
621 let projects = vec![
623 (tables::TABLE_NAME, "Name"),
624 (tables::ENGINE, "Engine"),
625 (tables::VERSION, "Version"),
626 (tables::ROW_FORMAT, "Row_format"),
627 (tables::TABLE_ROWS, "Rows"),
628 (tables::AVG_ROW_LENGTH, "Avg_row_length"),
629 (tables::DATA_LENGTH, "Data_length"),
630 (tables::MAX_DATA_LENGTH, "Max_data_length"),
631 (tables::INDEX_LENGTH, "Index_length"),
632 (tables::DATA_FREE, "Data_free"),
633 (tables::AUTO_INCREMENT, "Auto_increment"),
634 (tables::CREATE_TIME, "Create_time"),
635 (tables::UPDATE_TIME, "Update_time"),
636 (tables::CHECK_TIME, "Check_time"),
637 (tables::TABLE_COLLATION, "Collation"),
638 (tables::CHECKSUM, "Checksum"),
639 (tables::CREATE_OPTIONS, "Create_options"),
640 (tables::TABLE_COMMENT, "Comment"),
641 ];
642
643 let filters = vec![
644 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
645 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
646 ];
647 let like_field = Some(tables::TABLE_NAME);
648 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
649
650 query_from_information_schema_table(
651 query_engine,
652 catalog_manager,
653 query_ctx,
654 TABLES,
655 vec![],
656 projects,
657 filters,
658 like_field,
659 sort,
660 stmt.kind,
661 )
662 .await
663}
664
665pub async fn show_collations(
667 kind: ShowKind,
668 query_engine: &QueryEngineRef,
669 catalog_manager: &CatalogManagerRef,
670 query_ctx: QueryContextRef,
671) -> Result<Output> {
672 let projects = vec![
674 ("collation_name", "Collation"),
675 ("character_set_name", "Charset"),
676 ("id", "Id"),
677 ("is_default", "Default"),
678 ("is_compiled", "Compiled"),
679 ("sortlen", "Sortlen"),
680 ];
681
682 let filters = vec![];
683 let like_field = Some("collation_name");
684 let sort = vec![];
685
686 query_from_information_schema_table(
687 query_engine,
688 catalog_manager,
689 query_ctx,
690 COLLATIONS,
691 vec![],
692 projects,
693 filters,
694 like_field,
695 sort,
696 kind,
697 )
698 .await
699}
700
701pub async fn show_charsets(
703 kind: ShowKind,
704 query_engine: &QueryEngineRef,
705 catalog_manager: &CatalogManagerRef,
706 query_ctx: QueryContextRef,
707) -> Result<Output> {
708 let projects = vec![
710 ("character_set_name", "Charset"),
711 ("description", "Description"),
712 ("default_collate_name", "Default collation"),
713 ("maxlen", "Maxlen"),
714 ];
715
716 let filters = vec![];
717 let like_field = Some("character_set_name");
718 let sort = vec![];
719
720 query_from_information_schema_table(
721 query_engine,
722 catalog_manager,
723 query_ctx,
724 CHARACTER_SETS,
725 vec![],
726 projects,
727 filters,
728 like_field,
729 sort,
730 kind,
731 )
732 .await
733}
734
735pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
736 let variable = stmt.variable.to_string().to_uppercase();
737 let value = match variable.as_str() {
738 "SYSTEM_TIME_ZONE" | "SYSTEM_TIMEZONE" => get_timezone(None).to_string(),
739 "TIME_ZONE" | "TIMEZONE" => query_ctx.timezone().to_string(),
740 "READ_PREFERENCE" => query_ctx.read_preference().to_string(),
741 "DATESTYLE" => {
742 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
743 format!("{}, {}", style, order)
744 }
745 "INTERVALSTYLE" => {
746 let style = *query_ctx
747 .configuration_parameter()
748 .pg_intervalstyle_format();
749 style.to_string()
750 }
751 "MAX_EXECUTION_TIME" => {
752 if query_ctx.channel() == Channel::Mysql {
753 query_ctx.query_timeout_as_millis().to_string()
754 } else {
755 return UnsupportedVariableSnafu { name: variable }.fail();
756 }
757 }
758 "STATEMENT_TIMEOUT" => {
759 if query_ctx.channel() == Channel::Postgres {
761 let mut timeout = query_ctx.query_timeout_as_millis().to_string();
762 timeout.push_str("ms");
763 timeout
764 } else {
765 return UnsupportedVariableSnafu { name: variable }.fail();
766 }
767 }
768 _ => return UnsupportedVariableSnafu { name: variable }.fail(),
769 };
770 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
771 variable,
772 ConcreteDataType::string_datatype(),
773 false,
774 )]));
775 let records = RecordBatches::try_from_columns(
776 schema,
777 vec![Arc::new(StringVector::from(vec![value])) as _],
778 )
779 .context(error::CreateRecordBatchSnafu)?;
780 Ok(Output::new_with_record_batches(records))
781}
782
783pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
784 let schema = Arc::new(Schema::new(vec![
785 ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), false),
786 ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
787 ]));
788 let records = RecordBatches::try_from_columns(
789 schema,
790 vec![
791 Arc::new(StringVector::from(Vec::<&str>::new())) as _,
792 Arc::new(StringVector::from(Vec::<&str>::new())) as _,
793 ],
794 )
795 .context(error::CreateRecordBatchSnafu)?;
796 Ok(Output::new_with_record_batches(records))
797}
798
799pub async fn show_search_path(_query_ctx: QueryContextRef) -> Result<Output> {
800 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
801 "search_path",
802 ConcreteDataType::string_datatype(),
803 false,
804 )]));
805 let records = RecordBatches::try_from_columns(
806 schema,
807 vec![Arc::new(StringVector::from(vec![_query_ctx.current_schema()])) as _],
808 )
809 .context(error::CreateRecordBatchSnafu)?;
810 Ok(Output::new_with_record_batches(records))
811}
812
813pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
814 let stmt = CreateDatabase {
815 name: ObjectName::from(vec![Ident::new(database_name)]),
816 if_not_exists: true,
817 options,
818 };
819 let sql = format!("{stmt}");
820 let columns = vec![
821 Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
822 Arc::new(StringVector::from(vec![sql])) as _,
823 ];
824 let records =
825 RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
826 .context(error::CreateRecordBatchSnafu)?;
827 Ok(Output::new_with_record_batches(records))
828}
829
830pub fn show_create_table(
831 table_info: TableInfoRef,
832 schema_options: Option<SchemaOptions>,
833 partitions: Option<Partitions>,
834 query_ctx: QueryContextRef,
835) -> Result<Output> {
836 let table_name = table_info.name.clone();
837
838 let quote_style = query_ctx.quote_style();
839
840 let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
841 stmt.partitions = partitions.map(|mut p| {
842 p.set_quote(quote_style);
843 p
844 });
845 let sql = format!("{}", stmt);
846 let columns = vec![
847 Arc::new(StringVector::from(vec![table_name])) as _,
848 Arc::new(StringVector::from(vec![sql])) as _,
849 ];
850 let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
851 .context(error::CreateRecordBatchSnafu)?;
852
853 Ok(Output::new_with_record_batches(records))
854}
855
856pub fn show_create_foreign_table_for_pg(
857 table: TableRef,
858 _query_ctx: QueryContextRef,
859) -> Result<Output> {
860 let table_info = table.table_info();
861
862 let table_meta = &table_info.meta;
863 let table_name = &table_info.name;
864 let schema = &table_info.meta.schema;
865 let is_metric_engine = is_metric_engine(&table_meta.engine);
866
867 let columns = schema
868 .column_schemas()
869 .iter()
870 .filter_map(|c| {
871 if is_metric_engine && is_metric_engine_internal_column(&c.name) {
872 None
873 } else {
874 Some(format!(
875 "\"{}\" {}",
876 c.name,
877 c.data_type.postgres_datatype_name()
878 ))
879 }
880 })
881 .join(",\n ");
882
883 let sql = format!(
884 r#"CREATE FOREIGN TABLE ft_{} (
885 {}
886)
887SERVER greptimedb
888OPTIONS (table_name '{}')"#,
889 table_name, columns, table_name
890 );
891
892 let columns = vec![
893 Arc::new(StringVector::from(vec![table_name.clone()])) as _,
894 Arc::new(StringVector::from(vec![sql])) as _,
895 ];
896 let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
897 .context(error::CreateRecordBatchSnafu)?;
898
899 Ok(Output::new_with_record_batches(records))
900}
901
902pub fn show_create_view(
903 view_name: ObjectName,
904 definition: &str,
905 query_ctx: QueryContextRef,
906) -> Result<Output> {
907 let mut parser_ctx =
908 ParserContext::new(query_ctx.sql_dialect(), definition).context(error::SqlSnafu)?;
909
910 let Statement::CreateView(create_view) =
911 parser_ctx.parse_statement().context(error::SqlSnafu)?
912 else {
913 unreachable!();
915 };
916
917 let stmt = CreateView {
918 name: view_name.clone(),
919 columns: create_view.columns,
920 query: create_view.query,
921 or_replace: create_view.or_replace,
922 if_not_exists: create_view.if_not_exists,
923 };
924
925 let sql = format!("{}", stmt);
926 let columns = vec![
927 Arc::new(StringVector::from(vec![view_name.to_string()])) as _,
928 Arc::new(StringVector::from(vec![sql])) as _,
929 ];
930 let records = RecordBatches::try_from_columns(SHOW_CREATE_VIEW_OUTPUT_SCHEMA.clone(), columns)
931 .context(error::CreateRecordBatchSnafu)?;
932
933 Ok(Output::new_with_record_batches(records))
934}
935
936pub async fn show_views(
938 stmt: ShowViews,
939 query_engine: &QueryEngineRef,
940 catalog_manager: &CatalogManagerRef,
941 query_ctx: QueryContextRef,
942) -> Result<Output> {
943 let schema_name = if let Some(database) = stmt.database {
944 database
945 } else {
946 query_ctx.current_schema()
947 };
948
949 let projects = vec![(tables::TABLE_NAME, VIEWS_COLUMN)];
950 let filters = vec![
951 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
952 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
953 ];
954 let like_field = Some(tables::TABLE_NAME);
955 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
956
957 query_from_information_schema_table(
958 query_engine,
959 catalog_manager,
960 query_ctx,
961 VIEWS,
962 vec![],
963 projects,
964 filters,
965 like_field,
966 sort,
967 stmt.kind,
968 )
969 .await
970}
971
972pub async fn show_flows(
974 stmt: ShowFlows,
975 query_engine: &QueryEngineRef,
976 catalog_manager: &CatalogManagerRef,
977 query_ctx: QueryContextRef,
978) -> Result<Output> {
979 let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
980 let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
981 let like_field = Some(flows::FLOW_NAME);
982 let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
983
984 query_from_information_schema_table(
985 query_engine,
986 catalog_manager,
987 query_ctx,
988 FLOWS,
989 vec![],
990 projects,
991 filters,
992 like_field,
993 sort,
994 stmt.kind,
995 )
996 .await
997}
998
999#[cfg(feature = "enterprise")]
1000pub async fn show_triggers(
1001 stmt: sql::statements::show::trigger::ShowTriggers,
1002 query_engine: &QueryEngineRef,
1003 catalog_manager: &CatalogManagerRef,
1004 query_ctx: QueryContextRef,
1005) -> Result<Output> {
1006 const TRIGGER_NAME: &str = "trigger_name";
1007 const TRIGGERS_COLUMN: &str = "Triggers";
1008
1009 let projects = vec![(TRIGGER_NAME, TRIGGERS_COLUMN)];
1010 let like_field = Some(TRIGGER_NAME);
1011 let sort = vec![col(TRIGGER_NAME).sort(true, true)];
1012
1013 query_from_information_schema_table(
1014 query_engine,
1015 catalog_manager,
1016 query_ctx,
1017 catalog::information_schema::TRIGGERS,
1018 vec![],
1019 projects,
1020 vec![],
1021 like_field,
1022 sort,
1023 stmt.kind,
1024 )
1025 .await
1026}
1027
1028pub fn show_create_flow(
1029 flow_name: ObjectName,
1030 flow_val: FlowInfoValue,
1031 query_ctx: QueryContextRef,
1032) -> Result<Output> {
1033 let mut parser_ctx =
1034 ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
1035
1036 let query = parser_ctx.parse_statement().context(error::SqlSnafu)?;
1037
1038 let raw_query = match &query {
1040 Statement::Tql(_) => flow_val.raw_sql().clone(),
1041 _ => query.to_string(),
1042 };
1043
1044 let query = Box::new(SqlOrTql::try_from_statement(query, &raw_query).context(error::SqlSnafu)?);
1045
1046 let comment = if flow_val.comment().is_empty() {
1047 None
1048 } else {
1049 Some(flow_val.comment().clone())
1050 };
1051
1052 let stmt = CreateFlow {
1053 flow_name,
1054 sink_table_name: ObjectName::from(vec![Ident::new(&flow_val.sink_table_name().table_name)]),
1055 or_replace: false,
1058 if_not_exists: true,
1059 expire_after: flow_val.expire_after(),
1060 eval_interval: flow_val.eval_interval(),
1061 comment,
1062 query,
1063 };
1064
1065 let sql = format!("{}", stmt);
1066 let columns = vec![
1067 Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
1068 Arc::new(StringVector::from(vec![sql])) as _,
1069 ];
1070 let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
1071 .context(error::CreateRecordBatchSnafu)?;
1072
1073 Ok(Output::new_with_record_batches(records))
1074}
1075
1076pub fn describe_table(table: TableRef) -> Result<Output> {
1077 let table_info = table.table_info();
1078 let columns_schemas = table_info.meta.schema.column_schemas();
1079 let columns = vec![
1080 describe_column_names(columns_schemas),
1081 describe_column_types(columns_schemas),
1082 describe_column_keys(columns_schemas, &table_info.meta.primary_key_indices),
1083 describe_column_nullables(columns_schemas),
1084 describe_column_defaults(columns_schemas),
1085 describe_column_semantic_types(columns_schemas, &table_info.meta.primary_key_indices),
1086 ];
1087 let records = RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), columns)
1088 .context(error::CreateRecordBatchSnafu)?;
1089 Ok(Output::new_with_record_batches(records))
1090}
1091
1092fn describe_column_names(columns_schemas: &[ColumnSchema]) -> VectorRef {
1093 Arc::new(StringVector::from_iterator(
1094 columns_schemas.iter().map(|cs| cs.name.as_str()),
1095 ))
1096}
1097
1098fn describe_column_types(columns_schemas: &[ColumnSchema]) -> VectorRef {
1099 Arc::new(StringVector::from(
1100 columns_schemas
1101 .iter()
1102 .map(|cs| cs.data_type.name())
1103 .collect::<Vec<_>>(),
1104 ))
1105}
1106
1107fn describe_column_keys(
1108 columns_schemas: &[ColumnSchema],
1109 primary_key_indices: &[usize],
1110) -> VectorRef {
1111 Arc::new(StringVector::from_iterator(
1112 columns_schemas.iter().enumerate().map(|(i, cs)| {
1113 if cs.is_time_index() || primary_key_indices.contains(&i) {
1114 PRI_KEY
1115 } else {
1116 ""
1117 }
1118 }),
1119 ))
1120}
1121
1122fn describe_column_nullables(columns_schemas: &[ColumnSchema]) -> VectorRef {
1123 Arc::new(StringVector::from_iterator(columns_schemas.iter().map(
1124 |cs| {
1125 if cs.is_nullable() { YES_STR } else { NO_STR }
1126 },
1127 )))
1128}
1129
1130fn describe_column_defaults(columns_schemas: &[ColumnSchema]) -> VectorRef {
1131 Arc::new(StringVector::from(
1132 columns_schemas
1133 .iter()
1134 .map(|cs| {
1135 cs.default_constraint()
1136 .map_or(String::from(""), |dc| dc.to_string())
1137 })
1138 .collect::<Vec<String>>(),
1139 ))
1140}
1141
1142fn describe_column_semantic_types(
1143 columns_schemas: &[ColumnSchema],
1144 primary_key_indices: &[usize],
1145) -> VectorRef {
1146 Arc::new(StringVector::from_iterator(
1147 columns_schemas.iter().enumerate().map(|(i, cs)| {
1148 if primary_key_indices.contains(&i) {
1149 SEMANTIC_TYPE_PRIMARY_KEY
1150 } else if cs.is_time_index() {
1151 SEMANTIC_TYPE_TIME_INDEX
1152 } else {
1153 SEMANTIC_TYPE_FIELD
1154 }
1155 }),
1156 ))
1157}
1158
1159pub async fn prepare_file_table_files(
1161 options: &HashMap<String, String>,
1162) -> Result<(ObjectStore, Vec<String>)> {
1163 let url = options
1164 .get(FILE_TABLE_LOCATION_KEY)
1165 .context(error::MissingRequiredFieldSnafu {
1166 name: FILE_TABLE_LOCATION_KEY,
1167 })?;
1168
1169 let (dir, filename) = find_dir_and_filename(url);
1170 let source = if let Some(filename) = filename {
1171 Source::Filename(filename)
1172 } else {
1173 Source::Dir
1174 };
1175 let regex = options
1176 .get(FILE_TABLE_PATTERN_KEY)
1177 .map(|x| Regex::new(x))
1178 .transpose()
1179 .context(error::BuildRegexSnafu)?;
1180 let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
1181 let lister = Lister::new(object_store.clone(), source, dir, regex);
1182 let files = lister
1187 .list()
1188 .await
1189 .context(error::ListObjectsSnafu)?
1190 .into_iter()
1191 .filter_map(|entry| {
1192 if entry.path().ends_with('/') {
1193 None
1194 } else {
1195 Some(entry.path().to_string())
1196 }
1197 })
1198 .collect::<Vec<_>>();
1199 Ok((object_store, files))
1200}
1201
1202pub async fn infer_file_table_schema(
1203 object_store: &ObjectStore,
1204 files: &[String],
1205 options: &HashMap<String, String>,
1206) -> Result<RawSchema> {
1207 let format = parse_file_table_format(options)?;
1208 let merged = infer_schemas(object_store, files, format.as_ref())
1209 .await
1210 .context(error::InferSchemaSnafu)?;
1211 Ok(RawSchema::from(
1212 &Schema::try_from(merged).context(error::ConvertSchemaSnafu)?,
1213 ))
1214}
1215
1216pub fn file_column_schemas_to_table(
1226 file_column_schemas: &[ColumnSchema],
1227) -> (Vec<ColumnSchema>, String) {
1228 let mut column_schemas = file_column_schemas.to_owned();
1229 if let Some(time_index_column) = column_schemas.iter().find(|c| c.is_time_index()) {
1230 let time_index = time_index_column.name.clone();
1231 return (column_schemas, time_index);
1232 }
1233
1234 let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
1235 let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
1236 let timestamp_column_schema = ColumnSchema::new(greptime_timestamp(), timestamp_type, false)
1237 .with_time_index(true)
1238 .with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
1239 .unwrap();
1240
1241 if let Some(column_schema) = column_schemas
1242 .iter_mut()
1243 .find(|column_schema| column_schema.name == greptime_timestamp())
1244 {
1245 *column_schema = timestamp_column_schema;
1247 } else {
1248 column_schemas.push(timestamp_column_schema);
1249 }
1250
1251 (column_schemas, greptime_timestamp().to_string())
1252}
1253
1254pub fn check_file_to_table_schema_compatibility(
1263 file_column_schemas: &[ColumnSchema],
1264 table_column_schemas: &[ColumnSchema],
1265) -> Result<()> {
1266 let file_schemas_map = file_column_schemas
1267 .iter()
1268 .map(|s| (s.name.clone(), s))
1269 .collect::<HashMap<_, _>>();
1270
1271 for table_column in table_column_schemas {
1272 if let Some(file_column) = file_schemas_map.get(&table_column.name) {
1273 ensure!(
1275 file_column
1276 .data_type
1277 .can_arrow_type_cast_to(&table_column.data_type),
1278 error::ColumnSchemaIncompatibleSnafu {
1279 column: table_column.name.clone(),
1280 file_type: file_column.data_type.clone(),
1281 table_type: table_column.data_type.clone(),
1282 }
1283 );
1284 } else {
1285 ensure!(
1286 table_column.is_nullable() || table_column.default_constraint().is_some(),
1287 error::ColumnSchemaNoDefaultSnafu {
1288 column: table_column.name.clone(),
1289 }
1290 );
1291 }
1292 }
1293
1294 Ok(())
1295}
1296
1297fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn FileFormat>> {
1298 Ok(
1299 match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
1300 Format::Csv(format) => Box::new(format),
1301 Format::Json(format) => Box::new(format),
1302 Format::Parquet(format) => Box::new(format),
1303 Format::Orc(format) => Box::new(format),
1304 },
1305 )
1306}
1307
1308pub async fn show_processlist(
1309 stmt: ShowProcessList,
1310 query_engine: &QueryEngineRef,
1311 catalog_manager: &CatalogManagerRef,
1312 query_ctx: QueryContextRef,
1313) -> Result<Output> {
1314 let projects = if stmt.full {
1315 vec![
1316 (process_list::ID, "Id"),
1317 (process_list::CATALOG, "Catalog"),
1318 (process_list::SCHEMAS, "Schema"),
1319 (process_list::CLIENT, "Client"),
1320 (process_list::FRONTEND, "Frontend"),
1321 (process_list::START_TIMESTAMP, "Start Time"),
1322 (process_list::ELAPSED_TIME, "Elapsed Time"),
1323 (process_list::QUERY, "Query"),
1324 ]
1325 } else {
1326 vec![
1327 (process_list::ID, "Id"),
1328 (process_list::CATALOG, "Catalog"),
1329 (process_list::QUERY, "Query"),
1330 (process_list::ELAPSED_TIME, "Elapsed Time"),
1331 ]
1332 };
1333
1334 let filters = vec![];
1335 let like_field = None;
1336 let sort = vec![col("id").sort(true, true)];
1337 query_from_information_schema_table(
1338 query_engine,
1339 catalog_manager,
1340 query_ctx.clone(),
1341 "process_list",
1342 vec![],
1343 projects.clone(),
1344 filters,
1345 like_field,
1346 sort,
1347 ShowKind::All,
1348 )
1349 .await
1350}
1351
1352#[cfg(test)]
1353mod test {
1354 use std::sync::Arc;
1355
1356 use common_query::{Output, OutputData};
1357 use common_recordbatch::{RecordBatch, RecordBatches};
1358 use common_time::Timezone;
1359 use common_time::timestamp::TimeUnit;
1360 use datatypes::prelude::ConcreteDataType;
1361 use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
1362 use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
1363 use session::context::QueryContextBuilder;
1364 use snafu::ResultExt;
1365 use sql::ast::{Ident, ObjectName};
1366 use sql::statements::show::ShowVariables;
1367 use table::TableRef;
1368 use table::test_util::MemTable;
1369
1370 use super::show_variable;
1371 use crate::error;
1372 use crate::error::Result;
1373 use crate::sql::{
1374 DESCRIBE_TABLE_OUTPUT_SCHEMA, NO_STR, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_TIME_INDEX,
1375 YES_STR, describe_table,
1376 };
1377
1378 #[test]
1379 fn test_describe_table_multiple_columns() -> Result<()> {
1380 let table_name = "test_table";
1381 let schema = vec![
1382 ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
1383 ColumnSchema::new(
1384 "t2",
1385 ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
1386 false,
1387 )
1388 .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
1389 "current_timestamp()",
1390 ))))
1391 .unwrap()
1392 .with_time_index(true),
1393 ];
1394 let data = vec![
1395 Arc::new(UInt32Vector::from_slice([0])) as _,
1396 Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
1397 ];
1398 let expected_columns = vec![
1399 Arc::new(StringVector::from(vec!["t1", "t2"])) as _,
1400 Arc::new(StringVector::from(vec!["UInt32", "TimestampMillisecond"])) as _,
1401 Arc::new(StringVector::from(vec!["", "PRI"])) as _,
1402 Arc::new(StringVector::from(vec![YES_STR, NO_STR])) as _,
1403 Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
1404 Arc::new(StringVector::from(vec![
1405 SEMANTIC_TYPE_FIELD,
1406 SEMANTIC_TYPE_TIME_INDEX,
1407 ])) as _,
1408 ];
1409
1410 describe_table_test_by_schema(table_name, schema, data, expected_columns)
1411 }
1412
1413 fn describe_table_test_by_schema(
1414 table_name: &str,
1415 schema: Vec<ColumnSchema>,
1416 data: Vec<VectorRef>,
1417 expected_columns: Vec<VectorRef>,
1418 ) -> Result<()> {
1419 let table_schema = SchemaRef::new(Schema::new(schema));
1420 let table = prepare_describe_table(table_name, table_schema, data);
1421
1422 let expected =
1423 RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
1424 .context(error::CreateRecordBatchSnafu)?;
1425
1426 if let OutputData::RecordBatches(res) = describe_table(table)?.data {
1427 assert_eq!(res.take(), expected.take());
1428 } else {
1429 panic!("describe table must return record batch");
1430 }
1431
1432 Ok(())
1433 }
1434
1435 fn prepare_describe_table(
1436 table_name: &str,
1437 table_schema: SchemaRef,
1438 data: Vec<VectorRef>,
1439 ) -> TableRef {
1440 let record_batch = RecordBatch::new(table_schema, data).unwrap();
1441 MemTable::table(table_name, record_batch)
1442 }
1443
1444 #[test]
1445 fn test_show_variable() {
1446 assert_eq!(
1447 exec_show_variable("SYSTEM_TIME_ZONE", "Asia/Shanghai").unwrap(),
1448 "UTC"
1449 );
1450 assert_eq!(
1451 exec_show_variable("SYSTEM_TIMEZONE", "Asia/Shanghai").unwrap(),
1452 "UTC"
1453 );
1454 assert_eq!(
1455 exec_show_variable("TIME_ZONE", "Asia/Shanghai").unwrap(),
1456 "Asia/Shanghai"
1457 );
1458 assert_eq!(
1459 exec_show_variable("TIMEZONE", "Asia/Shanghai").unwrap(),
1460 "Asia/Shanghai"
1461 );
1462 assert!(exec_show_variable("TIME ZONE", "Asia/Shanghai").is_err());
1463 assert!(exec_show_variable("SYSTEM TIME ZONE", "Asia/Shanghai").is_err());
1464 }
1465
1466 fn exec_show_variable(variable: &str, tz: &str) -> Result<String> {
1467 let stmt = ShowVariables {
1468 variable: ObjectName::from(vec![Ident::new(variable)]),
1469 };
1470 let ctx = Arc::new(
1471 QueryContextBuilder::default()
1472 .timezone(Timezone::from_tz_string(tz).unwrap())
1473 .build(),
1474 );
1475 match show_variable(stmt, ctx) {
1476 Ok(Output {
1477 data: OutputData::RecordBatches(record),
1478 ..
1479 }) => {
1480 let record = record.take().first().cloned().unwrap();
1481 Ok(record.iter_column_as_string(0).next().unwrap().unwrap())
1482 }
1483 Ok(_) => unreachable!(),
1484 Err(e) => Err(e),
1485 }
1486 }
1487}