query/
sql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod show_create_table;
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use catalog::information_schema::{
21    columns, flows, key_column_usage, region_peers, schemata, tables, CHARACTER_SETS, COLLATIONS,
22    COLUMNS, FLOWS, KEY_COLUMN_USAGE, REGION_PEERS, SCHEMATA, TABLES, VIEWS,
23};
24use catalog::CatalogManagerRef;
25use common_catalog::consts::{
26    INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
27    SEMANTIC_TYPE_TIME_INDEX,
28};
29use common_catalog::format_full_table_name;
30use common_datasource::file_format::{infer_schemas, FileFormat, Format};
31use common_datasource::lister::{Lister, Source};
32use common_datasource::object_store::build_backend;
33use common_datasource::util::find_dir_and_filename;
34use common_meta::key::flow::flow_info::FlowInfoValue;
35use common_meta::SchemaOptions;
36use common_query::prelude::GREPTIME_TIMESTAMP;
37use common_query::Output;
38use common_recordbatch::adapter::RecordBatchStreamAdapter;
39use common_recordbatch::RecordBatches;
40use common_time::timezone::get_timezone;
41use common_time::Timestamp;
42use datafusion::common::ScalarValue;
43use datafusion::prelude::SessionContext;
44use datafusion_expr::expr::WildcardOptions;
45use datafusion_expr::{case, col, lit, Expr, SortExpr};
46use datatypes::prelude::*;
47use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
48use datatypes::vectors::StringVector;
49use itertools::Itertools;
50use object_store::ObjectStore;
51use once_cell::sync::Lazy;
52use regex::Regex;
53use session::context::{Channel, QueryContextRef};
54pub use show_create_table::create_table_stmt;
55use snafu::{ensure, OptionExt, ResultExt};
56use sql::ast::Ident;
57use sql::parser::ParserContext;
58use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions};
59use sql::statements::show::{
60    ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowRegion, ShowTableStatus,
61    ShowTables, ShowVariables, ShowViews,
62};
63use sql::statements::statement::Statement;
64use sql::statements::OptionMap;
65use sqlparser::ast::ObjectName;
66use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
67use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
68use table::TableRef;
69
70use crate::dataframe::DataFrame;
71use crate::error::{self, Result, UnsupportedVariableSnafu};
72use crate::planner::DfLogicalPlanner;
73use crate::QueryEngineRef;
74
75const SCHEMAS_COLUMN: &str = "Database";
76const OPTIONS_COLUMN: &str = "Options";
77const TABLES_COLUMN: &str = "Tables";
78const VIEWS_COLUMN: &str = "Views";
79const FLOWS_COLUMN: &str = "Flows";
80const FIELD_COLUMN: &str = "Field";
81const TABLE_TYPE_COLUMN: &str = "Table_type";
82const COLUMN_NAME_COLUMN: &str = "Column";
83const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
84const COLUMN_TYPE_COLUMN: &str = "Type";
85const COLUMN_KEY_COLUMN: &str = "Key";
86const COLUMN_EXTRA_COLUMN: &str = "Extra";
87const COLUMN_PRIVILEGES_COLUMN: &str = "Privileges";
88const COLUMN_COLLATION_COLUMN: &str = "Collation";
89const COLUMN_NULLABLE_COLUMN: &str = "Null";
90const COLUMN_DEFAULT_COLUMN: &str = "Default";
91const COLUMN_COMMENT_COLUMN: &str = "Comment";
92const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
93
94const YES_STR: &str = "YES";
95const NO_STR: &str = "NO";
96const PRI_KEY: &str = "PRI";
97const TIME_INDEX: &str = "TIME INDEX";
98
99/// SHOW index columns
100const INDEX_TABLE_COLUMN: &str = "Table";
101const INDEX_NONT_UNIQUE_COLUMN: &str = "Non_unique";
102const INDEX_CARDINALITY_COLUMN: &str = "Cardinality";
103const INDEX_SUB_PART_COLUMN: &str = "Sub_part";
104const INDEX_PACKED_COLUMN: &str = "Packed";
105const INDEX_INDEX_TYPE_COLUMN: &str = "Index_type";
106const INDEX_COMMENT_COLUMN: &str = "Index_comment";
107const INDEX_VISIBLE_COLUMN: &str = "Visible";
108const INDEX_EXPRESSION_COLUMN: &str = "Expression";
109const INDEX_KEY_NAME_COLUMN: &str = "Key_name";
110const INDEX_SEQ_IN_INDEX_COLUMN: &str = "Seq_in_index";
111const INDEX_COLUMN_NAME_COLUMN: &str = "Column_name";
112
113static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
114    Arc::new(Schema::new(vec![
115        ColumnSchema::new(
116            COLUMN_NAME_COLUMN,
117            ConcreteDataType::string_datatype(),
118            false,
119        ),
120        ColumnSchema::new(
121            COLUMN_TYPE_COLUMN,
122            ConcreteDataType::string_datatype(),
123            false,
124        ),
125        ColumnSchema::new(COLUMN_KEY_COLUMN, ConcreteDataType::string_datatype(), true),
126        ColumnSchema::new(
127            COLUMN_NULLABLE_COLUMN,
128            ConcreteDataType::string_datatype(),
129            false,
130        ),
131        ColumnSchema::new(
132            COLUMN_DEFAULT_COLUMN,
133            ConcreteDataType::string_datatype(),
134            false,
135        ),
136        ColumnSchema::new(
137            COLUMN_SEMANTIC_TYPE_COLUMN,
138            ConcreteDataType::string_datatype(),
139            false,
140        ),
141    ]))
142});
143
144static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
145    Arc::new(Schema::new(vec![
146        ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
147        ColumnSchema::new(
148            "Create Database",
149            ConcreteDataType::string_datatype(),
150            false,
151        ),
152    ]))
153});
154
155static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
156    Arc::new(Schema::new(vec![
157        ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
158        ColumnSchema::new("Create Table", ConcreteDataType::string_datatype(), false),
159    ]))
160});
161
162static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
163    Arc::new(Schema::new(vec![
164        ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
165        ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
166    ]))
167});
168
169static SHOW_CREATE_VIEW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
170    Arc::new(Schema::new(vec![
171        ColumnSchema::new("View", ConcreteDataType::string_datatype(), false),
172        ColumnSchema::new("Create View", ConcreteDataType::string_datatype(), false),
173    ]))
174});
175
176fn null() -> Expr {
177    lit(ScalarValue::Null)
178}
179
180pub async fn show_databases(
181    stmt: ShowDatabases,
182    query_engine: &QueryEngineRef,
183    catalog_manager: &CatalogManagerRef,
184    query_ctx: QueryContextRef,
185) -> Result<Output> {
186    let projects = if stmt.full {
187        vec![
188            (schemata::SCHEMA_NAME, SCHEMAS_COLUMN),
189            (schemata::SCHEMA_OPTS, OPTIONS_COLUMN),
190        ]
191    } else {
192        vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)]
193    };
194
195    let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
196    let like_field = Some(schemata::SCHEMA_NAME);
197    let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
198
199    query_from_information_schema_table(
200        query_engine,
201        catalog_manager,
202        query_ctx,
203        SCHEMATA,
204        vec![],
205        projects,
206        filters,
207        like_field,
208        sort,
209        stmt.kind,
210    )
211    .await
212}
213
214/// Cast a `show` statement execution into a query from tables in  `information_schema`.
215/// - `table_name`: the table name in `information_schema`,
216/// - `projects`: query projection, a list of `(column, renamed_column)`,
217/// - `filters`: filter expressions for query,
218/// - `like_field`: the field to filter by the predicate `ShowKind::Like`,
219/// - `sort`: sort the results by the specified sorting expressions,
220/// - `kind`: the show kind
221#[allow(clippy::too_many_arguments)]
222async fn query_from_information_schema_table(
223    query_engine: &QueryEngineRef,
224    catalog_manager: &CatalogManagerRef,
225    query_ctx: QueryContextRef,
226    table_name: &str,
227    select: Vec<Expr>,
228    projects: Vec<(&str, &str)>,
229    filters: Vec<Expr>,
230    like_field: Option<&str>,
231    sort: Vec<SortExpr>,
232    kind: ShowKind,
233) -> Result<Output> {
234    let table = catalog_manager
235        .table(
236            query_ctx.current_catalog(),
237            INFORMATION_SCHEMA_NAME,
238            table_name,
239            Some(&query_ctx),
240        )
241        .await
242        .context(error::CatalogSnafu)?
243        .with_context(|| error::TableNotFoundSnafu {
244            table: format_full_table_name(
245                query_ctx.current_catalog(),
246                INFORMATION_SCHEMA_NAME,
247                table_name,
248            ),
249        })?;
250
251    let DataFrame::DataFusion(dataframe) = query_engine.read_table(table)?;
252
253    // Apply select
254    let dataframe = if select.is_empty() {
255        dataframe
256    } else {
257        dataframe.select(select).context(error::PlanSqlSnafu)?
258    };
259
260    // Apply filters
261    let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
262        df.filter(expr).context(error::PlanSqlSnafu)
263    })?;
264
265    // Apply `like` predicate if exists
266    let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
267        dataframe
268            .filter(col(field).like(lit(ident.value.clone())))
269            .context(error::PlanSqlSnafu)?
270    } else {
271        dataframe
272    };
273
274    // Apply sorting
275    let dataframe = dataframe
276        .sort(sort)
277        .context(error::PlanSqlSnafu)?
278        .select_columns(&projects.iter().map(|(c, _)| *c).collect::<Vec<_>>())
279        .context(error::PlanSqlSnafu)?;
280
281    // Apply projection
282    let dataframe = projects
283        .into_iter()
284        .try_fold(dataframe, |df, (column, renamed_column)| {
285            df.with_column_renamed(column, renamed_column)
286                .context(error::PlanSqlSnafu)
287        })?;
288
289    let dataframe = match kind {
290        ShowKind::All | ShowKind::Like(_) => {
291            // Like kind is processed above
292            dataframe
293        }
294        ShowKind::Where(filter) => {
295            // Cast the results into VIEW for `where` clause,
296            // which is evaluated against the column names displayed by the SHOW statement.
297            let view = dataframe.into_view();
298            let dataframe = SessionContext::new_with_state(
299                query_engine
300                    .engine_context(query_ctx.clone())
301                    .state()
302                    .clone(),
303            )
304            .read_table(view)?;
305
306            let planner = query_engine.planner();
307            let planner = planner
308                .as_any()
309                .downcast_ref::<DfLogicalPlanner>()
310                .expect("Must be the datafusion planner");
311
312            let filter = planner
313                .sql_to_expr(filter, dataframe.schema(), false, query_ctx)
314                .await?;
315
316            // Apply the `where` clause filters
317            dataframe.filter(filter).context(error::PlanSqlSnafu)?
318        }
319    };
320
321    let stream = dataframe.execute_stream().await?;
322
323    Ok(Output::new_with_stream(Box::pin(
324        RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
325    )))
326}
327
328/// Execute `SHOW COLUMNS` statement.
329pub async fn show_columns(
330    stmt: ShowColumns,
331    query_engine: &QueryEngineRef,
332    catalog_manager: &CatalogManagerRef,
333    query_ctx: QueryContextRef,
334) -> Result<Output> {
335    let schema_name = if let Some(database) = stmt.database {
336        database
337    } else {
338        query_ctx.current_schema()
339    };
340
341    let projects = if stmt.full {
342        vec![
343            (columns::COLUMN_NAME, FIELD_COLUMN),
344            (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
345            (columns::COLLATION_NAME, COLUMN_COLLATION_COLUMN),
346            (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
347            (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
348            (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
349            (columns::COLUMN_COMMENT, COLUMN_COMMENT_COLUMN),
350            (columns::PRIVILEGES, COLUMN_PRIVILEGES_COLUMN),
351            (columns::EXTRA, COLUMN_EXTRA_COLUMN),
352            (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
353        ]
354    } else {
355        vec![
356            (columns::COLUMN_NAME, FIELD_COLUMN),
357            (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
358            (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
359            (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
360            (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
361            (columns::EXTRA, COLUMN_EXTRA_COLUMN),
362            (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
363        ]
364    };
365
366    let filters = vec![
367        col(columns::TABLE_NAME).eq(lit(&stmt.table)),
368        col(columns::TABLE_SCHEMA).eq(lit(schema_name.clone())),
369        col(columns::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
370    ];
371    let like_field = Some(columns::COLUMN_NAME);
372    let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
373
374    query_from_information_schema_table(
375        query_engine,
376        catalog_manager,
377        query_ctx,
378        COLUMNS,
379        vec![],
380        projects,
381        filters,
382        like_field,
383        sort,
384        stmt.kind,
385    )
386    .await
387}
388
389/// Execute `SHOW INDEX` statement.
390pub async fn show_index(
391    stmt: ShowIndex,
392    query_engine: &QueryEngineRef,
393    catalog_manager: &CatalogManagerRef,
394    query_ctx: QueryContextRef,
395) -> Result<Output> {
396    let schema_name = if let Some(database) = stmt.database {
397        database
398    } else {
399        query_ctx.current_schema()
400    };
401
402    let select = vec![
403        // 1 as `Non_unique`: contain duplicates
404        lit(1).alias(INDEX_NONT_UNIQUE_COLUMN),
405        // How the column is sorted in the index: A (ascending).
406        lit("A").alias(COLUMN_COLLATION_COLUMN),
407        null().alias(INDEX_CARDINALITY_COLUMN),
408        null().alias(INDEX_SUB_PART_COLUMN),
409        null().alias(INDEX_PACKED_COLUMN),
410        // case `constraint_name`
411        //    when 'TIME INDEX' then 'NO'
412        //    else 'YES'
413        // end as `Null`
414        case(col(key_column_usage::CONSTRAINT_NAME))
415            .when(lit(TIME_INDEX), lit(NO_STR))
416            .otherwise(lit(YES_STR))
417            .context(error::PlanSqlSnafu)?
418            .alias(COLUMN_NULLABLE_COLUMN),
419        lit("").alias(COLUMN_COMMENT_COLUMN),
420        lit("").alias(INDEX_COMMENT_COLUMN),
421        lit(YES_STR).alias(INDEX_VISIBLE_COLUMN),
422        null().alias(INDEX_EXPRESSION_COLUMN),
423        Expr::Wildcard {
424            qualifier: None,
425            options: Box::new(WildcardOptions::default()),
426        },
427    ];
428
429    let projects = vec![
430        (key_column_usage::TABLE_NAME, INDEX_TABLE_COLUMN),
431        (INDEX_NONT_UNIQUE_COLUMN, INDEX_NONT_UNIQUE_COLUMN),
432        (key_column_usage::CONSTRAINT_NAME, INDEX_KEY_NAME_COLUMN),
433        (
434            key_column_usage::ORDINAL_POSITION,
435            INDEX_SEQ_IN_INDEX_COLUMN,
436        ),
437        (key_column_usage::COLUMN_NAME, INDEX_COLUMN_NAME_COLUMN),
438        (COLUMN_COLLATION_COLUMN, COLUMN_COLLATION_COLUMN),
439        (INDEX_CARDINALITY_COLUMN, INDEX_CARDINALITY_COLUMN),
440        (INDEX_SUB_PART_COLUMN, INDEX_SUB_PART_COLUMN),
441        (INDEX_PACKED_COLUMN, INDEX_PACKED_COLUMN),
442        (COLUMN_NULLABLE_COLUMN, COLUMN_NULLABLE_COLUMN),
443        (
444            key_column_usage::GREPTIME_INDEX_TYPE,
445            INDEX_INDEX_TYPE_COLUMN,
446        ),
447        (COLUMN_COMMENT_COLUMN, COLUMN_COMMENT_COLUMN),
448        (INDEX_COMMENT_COLUMN, INDEX_COMMENT_COLUMN),
449        (INDEX_VISIBLE_COLUMN, INDEX_VISIBLE_COLUMN),
450        (INDEX_EXPRESSION_COLUMN, INDEX_EXPRESSION_COLUMN),
451    ];
452
453    let filters = vec![
454        col(key_column_usage::TABLE_NAME).eq(lit(&stmt.table)),
455        col(key_column_usage::TABLE_SCHEMA).eq(lit(schema_name.clone())),
456        col(key_column_usage::REAL_TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
457    ];
458    let like_field = None;
459    let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
460
461    query_from_information_schema_table(
462        query_engine,
463        catalog_manager,
464        query_ctx,
465        KEY_COLUMN_USAGE,
466        select,
467        projects,
468        filters,
469        like_field,
470        sort,
471        stmt.kind,
472    )
473    .await
474}
475
476/// Execute `SHOW REGION` statement.
477pub async fn show_region(
478    stmt: ShowRegion,
479    query_engine: &QueryEngineRef,
480    catalog_manager: &CatalogManagerRef,
481    query_ctx: QueryContextRef,
482) -> Result<Output> {
483    let schema_name = if let Some(database) = stmt.database {
484        database
485    } else {
486        query_ctx.current_schema()
487    };
488
489    let filters = vec![
490        col(region_peers::TABLE_NAME).eq(lit(&stmt.table)),
491        col(region_peers::TABLE_SCHEMA).eq(lit(schema_name.clone())),
492        col(region_peers::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
493    ];
494    let projects = vec![
495        (region_peers::TABLE_NAME, "Table"),
496        (region_peers::REGION_ID, "Region"),
497        (region_peers::PEER_ID, "Peer"),
498        (region_peers::IS_LEADER, "Leader"),
499    ];
500
501    let like_field = None;
502    let sort = vec![
503        col(columns::REGION_ID).sort(true, true),
504        col(columns::PEER_ID).sort(true, true),
505    ];
506
507    query_from_information_schema_table(
508        query_engine,
509        catalog_manager,
510        query_ctx,
511        REGION_PEERS,
512        vec![],
513        projects,
514        filters,
515        like_field,
516        sort,
517        stmt.kind,
518    )
519    .await
520}
521
522/// Execute [`ShowTables`] statement and return the [`Output`] if success.
523pub async fn show_tables(
524    stmt: ShowTables,
525    query_engine: &QueryEngineRef,
526    catalog_manager: &CatalogManagerRef,
527    query_ctx: QueryContextRef,
528) -> Result<Output> {
529    let schema_name = if let Some(database) = stmt.database {
530        database
531    } else {
532        query_ctx.current_schema()
533    };
534
535    // (dennis): MySQL rename `table_name` to `Tables_in_{schema}`, but we use `Tables` instead.
536    // I don't want to modify this currently, our dashboard may depend on it.
537    let projects = if stmt.full {
538        vec![
539            (tables::TABLE_NAME, TABLES_COLUMN),
540            (tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
541        ]
542    } else {
543        vec![(tables::TABLE_NAME, TABLES_COLUMN)]
544    };
545    let filters = vec![
546        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
547        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
548    ];
549    let like_field = Some(tables::TABLE_NAME);
550    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
551
552    query_from_information_schema_table(
553        query_engine,
554        catalog_manager,
555        query_ctx,
556        TABLES,
557        vec![],
558        projects,
559        filters,
560        like_field,
561        sort,
562        stmt.kind,
563    )
564    .await
565}
566
567/// Execute [`ShowTableStatus`] statement and return the [`Output`] if success.
568pub async fn show_table_status(
569    stmt: ShowTableStatus,
570    query_engine: &QueryEngineRef,
571    catalog_manager: &CatalogManagerRef,
572    query_ctx: QueryContextRef,
573) -> Result<Output> {
574    let schema_name = if let Some(database) = stmt.database {
575        database
576    } else {
577        query_ctx.current_schema()
578    };
579
580    // Refer to https://dev.mysql.com/doc/refman/8.4/en/show-table-status.html
581    let projects = vec![
582        (tables::TABLE_NAME, "Name"),
583        (tables::ENGINE, "Engine"),
584        (tables::VERSION, "Version"),
585        (tables::ROW_FORMAT, "Row_format"),
586        (tables::TABLE_ROWS, "Rows"),
587        (tables::AVG_ROW_LENGTH, "Avg_row_length"),
588        (tables::DATA_LENGTH, "Data_length"),
589        (tables::MAX_DATA_LENGTH, "Max_data_length"),
590        (tables::INDEX_LENGTH, "Index_length"),
591        (tables::DATA_FREE, "Data_free"),
592        (tables::AUTO_INCREMENT, "Auto_increment"),
593        (tables::CREATE_TIME, "Create_time"),
594        (tables::UPDATE_TIME, "Update_time"),
595        (tables::CHECK_TIME, "Check_time"),
596        (tables::TABLE_COLLATION, "Collation"),
597        (tables::CHECKSUM, "Checksum"),
598        (tables::CREATE_OPTIONS, "Create_options"),
599        (tables::TABLE_COMMENT, "Comment"),
600    ];
601
602    let filters = vec![
603        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
604        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
605    ];
606    let like_field = Some(tables::TABLE_NAME);
607    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
608
609    query_from_information_schema_table(
610        query_engine,
611        catalog_manager,
612        query_ctx,
613        TABLES,
614        vec![],
615        projects,
616        filters,
617        like_field,
618        sort,
619        stmt.kind,
620    )
621    .await
622}
623
624/// Execute `SHOW COLLATION` statement and returns the `Output` if success.
625pub async fn show_collations(
626    kind: ShowKind,
627    query_engine: &QueryEngineRef,
628    catalog_manager: &CatalogManagerRef,
629    query_ctx: QueryContextRef,
630) -> Result<Output> {
631    // Refer to https://dev.mysql.com/doc/refman/8.0/en/show-collation.html
632    let projects = vec![
633        ("collation_name", "Collation"),
634        ("character_set_name", "Charset"),
635        ("id", "Id"),
636        ("is_default", "Default"),
637        ("is_compiled", "Compiled"),
638        ("sortlen", "Sortlen"),
639    ];
640
641    let filters = vec![];
642    let like_field = Some("collation_name");
643    let sort = vec![];
644
645    query_from_information_schema_table(
646        query_engine,
647        catalog_manager,
648        query_ctx,
649        COLLATIONS,
650        vec![],
651        projects,
652        filters,
653        like_field,
654        sort,
655        kind,
656    )
657    .await
658}
659
660/// Execute `SHOW CHARSET` statement and returns the `Output` if success.
661pub async fn show_charsets(
662    kind: ShowKind,
663    query_engine: &QueryEngineRef,
664    catalog_manager: &CatalogManagerRef,
665    query_ctx: QueryContextRef,
666) -> Result<Output> {
667    // Refer to https://dev.mysql.com/doc/refman/8.0/en/show-character-set.html
668    let projects = vec![
669        ("character_set_name", "Charset"),
670        ("description", "Description"),
671        ("default_collate_name", "Default collation"),
672        ("maxlen", "Maxlen"),
673    ];
674
675    let filters = vec![];
676    let like_field = Some("character_set_name");
677    let sort = vec![];
678
679    query_from_information_schema_table(
680        query_engine,
681        catalog_manager,
682        query_ctx,
683        CHARACTER_SETS,
684        vec![],
685        projects,
686        filters,
687        like_field,
688        sort,
689        kind,
690    )
691    .await
692}
693
694pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
695    let variable = stmt.variable.to_string().to_uppercase();
696    let value = match variable.as_str() {
697        "SYSTEM_TIME_ZONE" | "SYSTEM_TIMEZONE" => get_timezone(None).to_string(),
698        "TIME_ZONE" | "TIMEZONE" => query_ctx.timezone().to_string(),
699        "READ_PREFERENCE" => query_ctx.read_preference().to_string(),
700        "DATESTYLE" => {
701            let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
702            format!("{}, {}", style, order)
703        }
704        "MAX_EXECUTION_TIME" => {
705            if query_ctx.channel() == Channel::Mysql {
706                query_ctx.query_timeout_as_millis().to_string()
707            } else {
708                return UnsupportedVariableSnafu { name: variable }.fail();
709            }
710        }
711        "STATEMENT_TIMEOUT" => {
712            // Add time units to postgres query timeout display.
713            if query_ctx.channel() == Channel::Postgres {
714                let mut timeout = query_ctx.query_timeout_as_millis().to_string();
715                timeout.push_str("ms");
716                timeout
717            } else {
718                return UnsupportedVariableSnafu { name: variable }.fail();
719            }
720        }
721        _ => return UnsupportedVariableSnafu { name: variable }.fail(),
722    };
723    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
724        variable,
725        ConcreteDataType::string_datatype(),
726        false,
727    )]));
728    let records = RecordBatches::try_from_columns(
729        schema,
730        vec![Arc::new(StringVector::from(vec![value])) as _],
731    )
732    .context(error::CreateRecordBatchSnafu)?;
733    Ok(Output::new_with_record_batches(records))
734}
735
736pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
737    let schema = Arc::new(Schema::new(vec![
738        ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), false),
739        ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
740    ]));
741    let records = RecordBatches::try_from_columns(
742        schema,
743        vec![
744            Arc::new(StringVector::from(Vec::<&str>::new())) as _,
745            Arc::new(StringVector::from(Vec::<&str>::new())) as _,
746        ],
747    )
748    .context(error::CreateRecordBatchSnafu)?;
749    Ok(Output::new_with_record_batches(records))
750}
751
752pub async fn show_search_path(_query_ctx: QueryContextRef) -> Result<Output> {
753    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
754        "search_path",
755        ConcreteDataType::string_datatype(),
756        false,
757    )]));
758    let records = RecordBatches::try_from_columns(
759        schema,
760        vec![Arc::new(StringVector::from(vec![_query_ctx.current_schema()])) as _],
761    )
762    .context(error::CreateRecordBatchSnafu)?;
763    Ok(Output::new_with_record_batches(records))
764}
765
766pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
767    let stmt = CreateDatabase {
768        name: ObjectName(vec![Ident::new(database_name)]),
769        if_not_exists: true,
770        options,
771    };
772    let sql = format!("{stmt}");
773    let columns = vec![
774        Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
775        Arc::new(StringVector::from(vec![sql])) as _,
776    ];
777    let records =
778        RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
779            .context(error::CreateRecordBatchSnafu)?;
780    Ok(Output::new_with_record_batches(records))
781}
782
783pub fn show_create_table(
784    table: TableRef,
785    schema_options: Option<SchemaOptions>,
786    partitions: Option<Partitions>,
787    query_ctx: QueryContextRef,
788) -> Result<Output> {
789    let table_info = table.table_info();
790    let table_name = &table_info.name;
791
792    let quote_style = query_ctx.quote_style();
793
794    let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
795    stmt.partitions = partitions.map(|mut p| {
796        p.set_quote(quote_style);
797        p
798    });
799    let sql = format!("{}", stmt);
800    let columns = vec![
801        Arc::new(StringVector::from(vec![table_name.clone()])) as _,
802        Arc::new(StringVector::from(vec![sql])) as _,
803    ];
804    let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
805        .context(error::CreateRecordBatchSnafu)?;
806
807    Ok(Output::new_with_record_batches(records))
808}
809
810pub fn show_create_foreign_table_for_pg(
811    table: TableRef,
812    _query_ctx: QueryContextRef,
813) -> Result<Output> {
814    let table_info = table.table_info();
815
816    let table_meta = &table_info.meta;
817    let table_name = &table_info.name;
818    let schema = &table_info.meta.schema;
819    let is_metric_engine = is_metric_engine(&table_meta.engine);
820
821    let columns = schema
822        .column_schemas()
823        .iter()
824        .filter_map(|c| {
825            if is_metric_engine && is_metric_engine_internal_column(&c.name) {
826                None
827            } else {
828                Some(format!(
829                    "\"{}\" {}",
830                    c.name,
831                    c.data_type.postgres_datatype_name()
832                ))
833            }
834        })
835        .join(",\n  ");
836
837    let sql = format!(
838        r#"CREATE FOREIGN TABLE ft_{} (
839  {}
840)
841SERVER greptimedb
842OPTIONS (table_name '{}')"#,
843        table_name, columns, table_name
844    );
845
846    let columns = vec![
847        Arc::new(StringVector::from(vec![table_name.clone()])) as _,
848        Arc::new(StringVector::from(vec![sql])) as _,
849    ];
850    let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
851        .context(error::CreateRecordBatchSnafu)?;
852
853    Ok(Output::new_with_record_batches(records))
854}
855
856pub fn show_create_view(
857    view_name: ObjectName,
858    definition: &str,
859    query_ctx: QueryContextRef,
860) -> Result<Output> {
861    let mut parser_ctx =
862        ParserContext::new(query_ctx.sql_dialect(), definition).context(error::SqlSnafu)?;
863
864    let Statement::CreateView(create_view) =
865        parser_ctx.parse_statement().context(error::SqlSnafu)?
866    else {
867        // MUST be `CreateView` statement.
868        unreachable!();
869    };
870
871    let stmt = CreateView {
872        name: view_name.clone(),
873        columns: create_view.columns,
874        query: create_view.query,
875        or_replace: create_view.or_replace,
876        if_not_exists: create_view.if_not_exists,
877    };
878
879    let sql = format!("{}", stmt);
880    let columns = vec![
881        Arc::new(StringVector::from(vec![view_name.to_string()])) as _,
882        Arc::new(StringVector::from(vec![sql])) as _,
883    ];
884    let records = RecordBatches::try_from_columns(SHOW_CREATE_VIEW_OUTPUT_SCHEMA.clone(), columns)
885        .context(error::CreateRecordBatchSnafu)?;
886
887    Ok(Output::new_with_record_batches(records))
888}
889
890/// Execute [`ShowViews`] statement and return the [`Output`] if success.
891pub async fn show_views(
892    stmt: ShowViews,
893    query_engine: &QueryEngineRef,
894    catalog_manager: &CatalogManagerRef,
895    query_ctx: QueryContextRef,
896) -> Result<Output> {
897    let schema_name = if let Some(database) = stmt.database {
898        database
899    } else {
900        query_ctx.current_schema()
901    };
902
903    let projects = vec![(tables::TABLE_NAME, VIEWS_COLUMN)];
904    let filters = vec![
905        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
906        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
907    ];
908    let like_field = Some(tables::TABLE_NAME);
909    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
910
911    query_from_information_schema_table(
912        query_engine,
913        catalog_manager,
914        query_ctx,
915        VIEWS,
916        vec![],
917        projects,
918        filters,
919        like_field,
920        sort,
921        stmt.kind,
922    )
923    .await
924}
925
926/// Execute [`ShowFlows`] statement and return the [`Output`] if success.
927pub async fn show_flows(
928    stmt: ShowFlows,
929    query_engine: &QueryEngineRef,
930    catalog_manager: &CatalogManagerRef,
931    query_ctx: QueryContextRef,
932) -> Result<Output> {
933    let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
934    let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
935    let like_field = Some(flows::FLOW_NAME);
936    let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
937
938    query_from_information_schema_table(
939        query_engine,
940        catalog_manager,
941        query_ctx,
942        FLOWS,
943        vec![],
944        projects,
945        filters,
946        like_field,
947        sort,
948        stmt.kind,
949    )
950    .await
951}
952
953pub fn show_create_flow(
954    flow_name: ObjectName,
955    flow_val: FlowInfoValue,
956    query_ctx: QueryContextRef,
957) -> Result<Output> {
958    let mut parser_ctx =
959        ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
960
961    let query = parser_ctx.parser_query().context(error::SqlSnafu)?;
962
963    let comment = if flow_val.comment().is_empty() {
964        None
965    } else {
966        Some(flow_val.comment().clone())
967    };
968
969    let stmt = CreateFlow {
970        flow_name,
971        sink_table_name: ObjectName(vec![Ident::new(&flow_val.sink_table_name().table_name)]),
972        // notice we don't want `OR REPLACE` and `IF NOT EXISTS` in same sql since it's unclear what to do
973        // so we set `or_replace` to false.
974        or_replace: false,
975        if_not_exists: true,
976        expire_after: flow_val.expire_after(),
977        comment,
978        query,
979    };
980
981    let sql = format!("{}", stmt);
982    let columns = vec![
983        Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
984        Arc::new(StringVector::from(vec![sql])) as _,
985    ];
986    let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
987        .context(error::CreateRecordBatchSnafu)?;
988
989    Ok(Output::new_with_record_batches(records))
990}
991
992pub fn describe_table(table: TableRef) -> Result<Output> {
993    let table_info = table.table_info();
994    let columns_schemas = table_info.meta.schema.column_schemas();
995    let columns = vec![
996        describe_column_names(columns_schemas),
997        describe_column_types(columns_schemas),
998        describe_column_keys(columns_schemas, &table_info.meta.primary_key_indices),
999        describe_column_nullables(columns_schemas),
1000        describe_column_defaults(columns_schemas),
1001        describe_column_semantic_types(columns_schemas, &table_info.meta.primary_key_indices),
1002    ];
1003    let records = RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), columns)
1004        .context(error::CreateRecordBatchSnafu)?;
1005    Ok(Output::new_with_record_batches(records))
1006}
1007
1008fn describe_column_names(columns_schemas: &[ColumnSchema]) -> VectorRef {
1009    Arc::new(StringVector::from_iterator(
1010        columns_schemas.iter().map(|cs| cs.name.as_str()),
1011    ))
1012}
1013
1014fn describe_column_types(columns_schemas: &[ColumnSchema]) -> VectorRef {
1015    Arc::new(StringVector::from(
1016        columns_schemas
1017            .iter()
1018            .map(|cs| cs.data_type.name())
1019            .collect::<Vec<_>>(),
1020    ))
1021}
1022
1023fn describe_column_keys(
1024    columns_schemas: &[ColumnSchema],
1025    primary_key_indices: &[usize],
1026) -> VectorRef {
1027    Arc::new(StringVector::from_iterator(
1028        columns_schemas.iter().enumerate().map(|(i, cs)| {
1029            if cs.is_time_index() || primary_key_indices.contains(&i) {
1030                PRI_KEY
1031            } else {
1032                ""
1033            }
1034        }),
1035    ))
1036}
1037
1038fn describe_column_nullables(columns_schemas: &[ColumnSchema]) -> VectorRef {
1039    Arc::new(StringVector::from_iterator(columns_schemas.iter().map(
1040        |cs| {
1041            if cs.is_nullable() {
1042                YES_STR
1043            } else {
1044                NO_STR
1045            }
1046        },
1047    )))
1048}
1049
1050fn describe_column_defaults(columns_schemas: &[ColumnSchema]) -> VectorRef {
1051    Arc::new(StringVector::from(
1052        columns_schemas
1053            .iter()
1054            .map(|cs| {
1055                cs.default_constraint()
1056                    .map_or(String::from(""), |dc| dc.to_string())
1057            })
1058            .collect::<Vec<String>>(),
1059    ))
1060}
1061
1062fn describe_column_semantic_types(
1063    columns_schemas: &[ColumnSchema],
1064    primary_key_indices: &[usize],
1065) -> VectorRef {
1066    Arc::new(StringVector::from_iterator(
1067        columns_schemas.iter().enumerate().map(|(i, cs)| {
1068            if primary_key_indices.contains(&i) {
1069                SEMANTIC_TYPE_PRIMARY_KEY
1070            } else if cs.is_time_index() {
1071                SEMANTIC_TYPE_TIME_INDEX
1072            } else {
1073                SEMANTIC_TYPE_FIELD
1074            }
1075        }),
1076    ))
1077}
1078
1079// lists files in the frontend to reduce unnecessary scan requests repeated in each datanode.
1080pub async fn prepare_file_table_files(
1081    options: &HashMap<String, String>,
1082) -> Result<(ObjectStore, Vec<String>)> {
1083    let url = options
1084        .get(FILE_TABLE_LOCATION_KEY)
1085        .context(error::MissingRequiredFieldSnafu {
1086            name: FILE_TABLE_LOCATION_KEY,
1087        })?;
1088
1089    let (dir, filename) = find_dir_and_filename(url);
1090    let source = if let Some(filename) = filename {
1091        Source::Filename(filename)
1092    } else {
1093        Source::Dir
1094    };
1095    let regex = options
1096        .get(FILE_TABLE_PATTERN_KEY)
1097        .map(|x| Regex::new(x))
1098        .transpose()
1099        .context(error::BuildRegexSnafu)?;
1100    let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
1101    let lister = Lister::new(object_store.clone(), source, dir, regex);
1102    // If we scan files in a directory every time the database restarts,
1103    // then it might lead to a potential undefined behavior:
1104    // If a user adds a file with an incompatible schema to that directory,
1105    // it will make the external table unavailable.
1106    let files = lister
1107        .list()
1108        .await
1109        .context(error::ListObjectsSnafu)?
1110        .into_iter()
1111        .filter_map(|entry| {
1112            if entry.path().ends_with('/') {
1113                None
1114            } else {
1115                Some(entry.path().to_string())
1116            }
1117        })
1118        .collect::<Vec<_>>();
1119    Ok((object_store, files))
1120}
1121
1122pub async fn infer_file_table_schema(
1123    object_store: &ObjectStore,
1124    files: &[String],
1125    options: &HashMap<String, String>,
1126) -> Result<RawSchema> {
1127    let format = parse_file_table_format(options)?;
1128    let merged = infer_schemas(object_store, files, format.as_ref())
1129        .await
1130        .context(error::InferSchemaSnafu)?;
1131    Ok(RawSchema::from(
1132        &Schema::try_from(merged).context(error::ConvertSchemaSnafu)?,
1133    ))
1134}
1135
1136// Converts the file column schemas to table column schemas.
1137// Returns the column schemas and the time index column name.
1138//
1139// More specifically, this function will do the following:
1140// 1. Add a default time index column if there is no time index column
1141//    in the file column schemas, or
1142// 2. If the file column schemas contain a column with name conflicts with
1143//    the default time index column, it will replace the column schema
1144//    with the default one.
1145pub fn file_column_schemas_to_table(
1146    file_column_schemas: &[ColumnSchema],
1147) -> (Vec<ColumnSchema>, String) {
1148    let mut column_schemas = file_column_schemas.to_owned();
1149    if let Some(time_index_column) = column_schemas.iter().find(|c| c.is_time_index()) {
1150        let time_index = time_index_column.name.clone();
1151        return (column_schemas, time_index);
1152    }
1153
1154    let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
1155    let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
1156    let timestamp_column_schema = ColumnSchema::new(GREPTIME_TIMESTAMP, timestamp_type, false)
1157        .with_time_index(true)
1158        .with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
1159        .unwrap();
1160
1161    if let Some(column_schema) = column_schemas
1162        .iter_mut()
1163        .find(|column_schema| column_schema.name == GREPTIME_TIMESTAMP)
1164    {
1165        // Replace the column schema with the default one
1166        *column_schema = timestamp_column_schema;
1167    } else {
1168        column_schemas.push(timestamp_column_schema);
1169    }
1170
1171    (column_schemas, GREPTIME_TIMESTAMP.to_string())
1172}
1173
1174/// This function checks if the column schemas from a file can be matched with
1175/// the column schemas of a table.
1176///
1177/// More specifically, for each column seen in the table schema,
1178/// - If the same column does exist in the file schema, it checks if the data
1179///   type of the file column can be casted into the form of the table column.
1180/// - If the same column does not exist in the file schema, it checks if the
1181///   table column is nullable or has a default constraint.
1182pub fn check_file_to_table_schema_compatibility(
1183    file_column_schemas: &[ColumnSchema],
1184    table_column_schemas: &[ColumnSchema],
1185) -> Result<()> {
1186    let file_schemas_map = file_column_schemas
1187        .iter()
1188        .map(|s| (s.name.clone(), s))
1189        .collect::<HashMap<_, _>>();
1190
1191    for table_column in table_column_schemas {
1192        if let Some(file_column) = file_schemas_map.get(&table_column.name) {
1193            // TODO(zhongzc): a temporary solution, we should use `can_cast_to` once it's ready.
1194            ensure!(
1195                file_column
1196                    .data_type
1197                    .can_arrow_type_cast_to(&table_column.data_type),
1198                error::ColumnSchemaIncompatibleSnafu {
1199                    column: table_column.name.clone(),
1200                    file_type: file_column.data_type.clone(),
1201                    table_type: table_column.data_type.clone(),
1202                }
1203            );
1204        } else {
1205            ensure!(
1206                table_column.is_nullable() || table_column.default_constraint().is_some(),
1207                error::ColumnSchemaNoDefaultSnafu {
1208                    column: table_column.name.clone(),
1209                }
1210            );
1211        }
1212    }
1213
1214    Ok(())
1215}
1216
1217fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn FileFormat>> {
1218    Ok(
1219        match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
1220            Format::Csv(format) => Box::new(format),
1221            Format::Json(format) => Box::new(format),
1222            Format::Parquet(format) => Box::new(format),
1223            Format::Orc(format) => Box::new(format),
1224        },
1225    )
1226}
1227
1228#[cfg(test)]
1229mod test {
1230    use std::sync::Arc;
1231
1232    use common_query::{Output, OutputData};
1233    use common_recordbatch::{RecordBatch, RecordBatches};
1234    use common_time::timestamp::TimeUnit;
1235    use common_time::Timezone;
1236    use datatypes::prelude::ConcreteDataType;
1237    use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
1238    use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
1239    use session::context::QueryContextBuilder;
1240    use snafu::ResultExt;
1241    use sql::ast::{Ident, ObjectName};
1242    use sql::statements::show::ShowVariables;
1243    use table::test_util::MemTable;
1244    use table::TableRef;
1245
1246    use super::show_variable;
1247    use crate::error;
1248    use crate::error::Result;
1249    use crate::sql::{
1250        describe_table, DESCRIBE_TABLE_OUTPUT_SCHEMA, NO_STR, SEMANTIC_TYPE_FIELD,
1251        SEMANTIC_TYPE_TIME_INDEX, YES_STR,
1252    };
1253
1254    #[test]
1255    fn test_describe_table_multiple_columns() -> Result<()> {
1256        let table_name = "test_table";
1257        let schema = vec![
1258            ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
1259            ColumnSchema::new(
1260                "t2",
1261                ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
1262                false,
1263            )
1264            .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
1265                "current_timestamp()",
1266            ))))
1267            .unwrap()
1268            .with_time_index(true),
1269        ];
1270        let data = vec![
1271            Arc::new(UInt32Vector::from_slice([0])) as _,
1272            Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
1273        ];
1274        let expected_columns = vec![
1275            Arc::new(StringVector::from(vec!["t1", "t2"])) as _,
1276            Arc::new(StringVector::from(vec!["UInt32", "TimestampMillisecond"])) as _,
1277            Arc::new(StringVector::from(vec!["", "PRI"])) as _,
1278            Arc::new(StringVector::from(vec![YES_STR, NO_STR])) as _,
1279            Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
1280            Arc::new(StringVector::from(vec![
1281                SEMANTIC_TYPE_FIELD,
1282                SEMANTIC_TYPE_TIME_INDEX,
1283            ])) as _,
1284        ];
1285
1286        describe_table_test_by_schema(table_name, schema, data, expected_columns)
1287    }
1288
1289    fn describe_table_test_by_schema(
1290        table_name: &str,
1291        schema: Vec<ColumnSchema>,
1292        data: Vec<VectorRef>,
1293        expected_columns: Vec<VectorRef>,
1294    ) -> Result<()> {
1295        let table_schema = SchemaRef::new(Schema::new(schema));
1296        let table = prepare_describe_table(table_name, table_schema, data);
1297
1298        let expected =
1299            RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
1300                .context(error::CreateRecordBatchSnafu)?;
1301
1302        if let OutputData::RecordBatches(res) = describe_table(table)?.data {
1303            assert_eq!(res.take(), expected.take());
1304        } else {
1305            panic!("describe table must return record batch");
1306        }
1307
1308        Ok(())
1309    }
1310
1311    fn prepare_describe_table(
1312        table_name: &str,
1313        table_schema: SchemaRef,
1314        data: Vec<VectorRef>,
1315    ) -> TableRef {
1316        let record_batch = RecordBatch::new(table_schema, data).unwrap();
1317        MemTable::table(table_name, record_batch)
1318    }
1319
1320    #[test]
1321    fn test_show_variable() {
1322        assert_eq!(
1323            exec_show_variable("SYSTEM_TIME_ZONE", "Asia/Shanghai").unwrap(),
1324            "UTC"
1325        );
1326        assert_eq!(
1327            exec_show_variable("SYSTEM_TIMEZONE", "Asia/Shanghai").unwrap(),
1328            "UTC"
1329        );
1330        assert_eq!(
1331            exec_show_variable("TIME_ZONE", "Asia/Shanghai").unwrap(),
1332            "Asia/Shanghai"
1333        );
1334        assert_eq!(
1335            exec_show_variable("TIMEZONE", "Asia/Shanghai").unwrap(),
1336            "Asia/Shanghai"
1337        );
1338        assert!(exec_show_variable("TIME ZONE", "Asia/Shanghai").is_err());
1339        assert!(exec_show_variable("SYSTEM TIME ZONE", "Asia/Shanghai").is_err());
1340    }
1341
1342    fn exec_show_variable(variable: &str, tz: &str) -> Result<String> {
1343        let stmt = ShowVariables {
1344            variable: ObjectName(vec![Ident::new(variable)]),
1345        };
1346        let ctx = Arc::new(
1347            QueryContextBuilder::default()
1348                .timezone(Timezone::from_tz_string(tz).unwrap())
1349                .build(),
1350        );
1351        match show_variable(stmt, ctx) {
1352            Ok(Output {
1353                data: OutputData::RecordBatches(record),
1354                ..
1355            }) => {
1356                let record = record.take().first().cloned().unwrap();
1357                let data = record.column(0);
1358                Ok(data.get(0).to_string())
1359            }
1360            Ok(_) => unreachable!(),
1361            Err(e) => Err(e),
1362        }
1363    }
1364}