query/
sql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod show_create_table;
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use catalog::information_schema::{
21    columns, flows, key_column_usage, process_list, region_peers, schemata, tables, CHARACTER_SETS,
22    COLLATIONS, COLUMNS, FLOWS, KEY_COLUMN_USAGE, REGION_PEERS, SCHEMATA, TABLES, VIEWS,
23};
24use catalog::CatalogManagerRef;
25use common_catalog::consts::{
26    INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
27    SEMANTIC_TYPE_TIME_INDEX,
28};
29use common_catalog::format_full_table_name;
30use common_datasource::file_format::{infer_schemas, FileFormat, Format};
31use common_datasource::lister::{Lister, Source};
32use common_datasource::object_store::build_backend;
33use common_datasource::util::find_dir_and_filename;
34use common_meta::key::flow::flow_info::FlowInfoValue;
35use common_meta::SchemaOptions;
36use common_query::prelude::GREPTIME_TIMESTAMP;
37use common_query::Output;
38use common_recordbatch::adapter::RecordBatchStreamAdapter;
39use common_recordbatch::RecordBatches;
40use common_time::timezone::get_timezone;
41use common_time::Timestamp;
42use datafusion::common::ScalarValue;
43use datafusion::prelude::SessionContext;
44use datafusion_expr::{case, col, lit, Expr, SortExpr};
45use datatypes::prelude::*;
46use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
47use datatypes::vectors::StringVector;
48use itertools::Itertools;
49use object_store::ObjectStore;
50use once_cell::sync::Lazy;
51use regex::Regex;
52use session::context::{Channel, QueryContextRef};
53pub use show_create_table::create_table_stmt;
54use snafu::{ensure, OptionExt, ResultExt};
55use sql::ast::Ident;
56use sql::parser::ParserContext;
57use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions, SqlOrTql};
58use sql::statements::show::{
59    ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowProcessList, ShowRegion,
60    ShowTableStatus, ShowTables, ShowVariables, ShowViews,
61};
62use sql::statements::statement::Statement;
63use sql::statements::OptionMap;
64use sqlparser::ast::ObjectName;
65use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
66use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
67use table::TableRef;
68
69use crate::dataframe::DataFrame;
70use crate::error::{self, Result, UnsupportedVariableSnafu};
71use crate::planner::DfLogicalPlanner;
72use crate::QueryEngineRef;
73
74const SCHEMAS_COLUMN: &str = "Database";
75const OPTIONS_COLUMN: &str = "Options";
76const TABLES_COLUMN: &str = "Tables";
77const VIEWS_COLUMN: &str = "Views";
78const FLOWS_COLUMN: &str = "Flows";
79const FIELD_COLUMN: &str = "Field";
80const TABLE_TYPE_COLUMN: &str = "Table_type";
81const COLUMN_NAME_COLUMN: &str = "Column";
82const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
83const COLUMN_TYPE_COLUMN: &str = "Type";
84const COLUMN_KEY_COLUMN: &str = "Key";
85const COLUMN_EXTRA_COLUMN: &str = "Extra";
86const COLUMN_PRIVILEGES_COLUMN: &str = "Privileges";
87const COLUMN_COLLATION_COLUMN: &str = "Collation";
88const COLUMN_NULLABLE_COLUMN: &str = "Null";
89const COLUMN_DEFAULT_COLUMN: &str = "Default";
90const COLUMN_COMMENT_COLUMN: &str = "Comment";
91const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
92
93const YES_STR: &str = "YES";
94const NO_STR: &str = "NO";
95const PRI_KEY: &str = "PRI";
96const TIME_INDEX: &str = "TIME INDEX";
97
98/// SHOW index columns
99const INDEX_TABLE_COLUMN: &str = "Table";
100const INDEX_NONT_UNIQUE_COLUMN: &str = "Non_unique";
101const INDEX_CARDINALITY_COLUMN: &str = "Cardinality";
102const INDEX_SUB_PART_COLUMN: &str = "Sub_part";
103const INDEX_PACKED_COLUMN: &str = "Packed";
104const INDEX_INDEX_TYPE_COLUMN: &str = "Index_type";
105const INDEX_COMMENT_COLUMN: &str = "Index_comment";
106const INDEX_VISIBLE_COLUMN: &str = "Visible";
107const INDEX_EXPRESSION_COLUMN: &str = "Expression";
108const INDEX_KEY_NAME_COLUMN: &str = "Key_name";
109const INDEX_SEQ_IN_INDEX_COLUMN: &str = "Seq_in_index";
110const INDEX_COLUMN_NAME_COLUMN: &str = "Column_name";
111
112static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
113    Arc::new(Schema::new(vec![
114        ColumnSchema::new(
115            COLUMN_NAME_COLUMN,
116            ConcreteDataType::string_datatype(),
117            false,
118        ),
119        ColumnSchema::new(
120            COLUMN_TYPE_COLUMN,
121            ConcreteDataType::string_datatype(),
122            false,
123        ),
124        ColumnSchema::new(COLUMN_KEY_COLUMN, ConcreteDataType::string_datatype(), true),
125        ColumnSchema::new(
126            COLUMN_NULLABLE_COLUMN,
127            ConcreteDataType::string_datatype(),
128            false,
129        ),
130        ColumnSchema::new(
131            COLUMN_DEFAULT_COLUMN,
132            ConcreteDataType::string_datatype(),
133            false,
134        ),
135        ColumnSchema::new(
136            COLUMN_SEMANTIC_TYPE_COLUMN,
137            ConcreteDataType::string_datatype(),
138            false,
139        ),
140    ]))
141});
142
143static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
144    Arc::new(Schema::new(vec![
145        ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
146        ColumnSchema::new(
147            "Create Database",
148            ConcreteDataType::string_datatype(),
149            false,
150        ),
151    ]))
152});
153
154static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
155    Arc::new(Schema::new(vec![
156        ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
157        ColumnSchema::new("Create Table", ConcreteDataType::string_datatype(), false),
158    ]))
159});
160
161static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
162    Arc::new(Schema::new(vec![
163        ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
164        ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
165    ]))
166});
167
168static SHOW_CREATE_VIEW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
169    Arc::new(Schema::new(vec![
170        ColumnSchema::new("View", ConcreteDataType::string_datatype(), false),
171        ColumnSchema::new("Create View", ConcreteDataType::string_datatype(), false),
172    ]))
173});
174
175fn null() -> Expr {
176    lit(ScalarValue::Null)
177}
178
179pub async fn show_databases(
180    stmt: ShowDatabases,
181    query_engine: &QueryEngineRef,
182    catalog_manager: &CatalogManagerRef,
183    query_ctx: QueryContextRef,
184) -> Result<Output> {
185    let projects = if stmt.full {
186        vec![
187            (schemata::SCHEMA_NAME, SCHEMAS_COLUMN),
188            (schemata::SCHEMA_OPTS, OPTIONS_COLUMN),
189        ]
190    } else {
191        vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)]
192    };
193
194    let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
195    let like_field = Some(schemata::SCHEMA_NAME);
196    let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
197
198    query_from_information_schema_table(
199        query_engine,
200        catalog_manager,
201        query_ctx,
202        SCHEMATA,
203        vec![],
204        projects,
205        filters,
206        like_field,
207        sort,
208        stmt.kind,
209    )
210    .await
211}
212
213/// Cast a `show` statement execution into a query from tables in  `information_schema`.
214/// - `table_name`: the table name in `information_schema`,
215/// - `projects`: query projection, a list of `(column, renamed_column)`,
216/// - `filters`: filter expressions for query,
217/// - `like_field`: the field to filter by the predicate `ShowKind::Like`,
218/// - `sort`: sort the results by the specified sorting expressions,
219/// - `kind`: the show kind
220#[allow(clippy::too_many_arguments)]
221async fn query_from_information_schema_table(
222    query_engine: &QueryEngineRef,
223    catalog_manager: &CatalogManagerRef,
224    query_ctx: QueryContextRef,
225    table_name: &str,
226    select: Vec<Expr>,
227    projects: Vec<(&str, &str)>,
228    filters: Vec<Expr>,
229    like_field: Option<&str>,
230    sort: Vec<SortExpr>,
231    kind: ShowKind,
232) -> Result<Output> {
233    let table = catalog_manager
234        .table(
235            query_ctx.current_catalog(),
236            INFORMATION_SCHEMA_NAME,
237            table_name,
238            Some(&query_ctx),
239        )
240        .await
241        .context(error::CatalogSnafu)?
242        .with_context(|| error::TableNotFoundSnafu {
243            table: format_full_table_name(
244                query_ctx.current_catalog(),
245                INFORMATION_SCHEMA_NAME,
246                table_name,
247            ),
248        })?;
249
250    let DataFrame::DataFusion(dataframe) = query_engine.read_table(table)?;
251
252    // Apply filters
253    let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
254        df.filter(expr).context(error::PlanSqlSnafu)
255    })?;
256
257    // Apply `like` predicate if exists
258    let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
259        dataframe
260            .filter(col(field).like(lit(ident.value.clone())))
261            .context(error::PlanSqlSnafu)?
262    } else {
263        dataframe
264    };
265
266    // Apply sorting
267    let dataframe = if sort.is_empty() {
268        dataframe
269    } else {
270        dataframe.sort(sort).context(error::PlanSqlSnafu)?
271    };
272
273    // Apply select
274    let dataframe = if select.is_empty() {
275        if projects.is_empty() {
276            dataframe
277        } else {
278            let projection = projects
279                .iter()
280                .map(|x| col(x.0).alias(x.1))
281                .collect::<Vec<_>>();
282            dataframe.select(projection).context(error::PlanSqlSnafu)?
283        }
284    } else {
285        dataframe.select(select).context(error::PlanSqlSnafu)?
286    };
287
288    // Apply projection
289    let dataframe = projects
290        .into_iter()
291        .try_fold(dataframe, |df, (column, renamed_column)| {
292            df.with_column_renamed(column, renamed_column)
293                .context(error::PlanSqlSnafu)
294        })?;
295
296    let dataframe = match kind {
297        ShowKind::All | ShowKind::Like(_) => {
298            // Like kind is processed above
299            dataframe
300        }
301        ShowKind::Where(filter) => {
302            // Cast the results into VIEW for `where` clause,
303            // which is evaluated against the column names displayed by the SHOW statement.
304            let view = dataframe.into_view();
305            let dataframe = SessionContext::new_with_state(
306                query_engine
307                    .engine_context(query_ctx.clone())
308                    .state()
309                    .clone(),
310            )
311            .read_table(view)?;
312
313            let planner = query_engine.planner();
314            let planner = planner
315                .as_any()
316                .downcast_ref::<DfLogicalPlanner>()
317                .expect("Must be the datafusion planner");
318
319            let filter = planner
320                .sql_to_expr(filter, dataframe.schema(), false, query_ctx)
321                .await?;
322
323            // Apply the `where` clause filters
324            dataframe.filter(filter).context(error::PlanSqlSnafu)?
325        }
326    };
327
328    let stream = dataframe.execute_stream().await?;
329
330    Ok(Output::new_with_stream(Box::pin(
331        RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
332    )))
333}
334
335/// Execute `SHOW COLUMNS` statement.
336pub async fn show_columns(
337    stmt: ShowColumns,
338    query_engine: &QueryEngineRef,
339    catalog_manager: &CatalogManagerRef,
340    query_ctx: QueryContextRef,
341) -> Result<Output> {
342    let schema_name = if let Some(database) = stmt.database {
343        database
344    } else {
345        query_ctx.current_schema()
346    };
347
348    let projects = if stmt.full {
349        vec![
350            (columns::COLUMN_NAME, FIELD_COLUMN),
351            (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
352            (columns::COLLATION_NAME, COLUMN_COLLATION_COLUMN),
353            (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
354            (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
355            (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
356            (columns::COLUMN_COMMENT, COLUMN_COMMENT_COLUMN),
357            (columns::PRIVILEGES, COLUMN_PRIVILEGES_COLUMN),
358            (columns::EXTRA, COLUMN_EXTRA_COLUMN),
359            (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
360        ]
361    } else {
362        vec![
363            (columns::COLUMN_NAME, FIELD_COLUMN),
364            (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
365            (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
366            (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
367            (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
368            (columns::EXTRA, COLUMN_EXTRA_COLUMN),
369            (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
370        ]
371    };
372
373    let filters = vec![
374        col(columns::TABLE_NAME).eq(lit(&stmt.table)),
375        col(columns::TABLE_SCHEMA).eq(lit(schema_name.clone())),
376        col(columns::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
377    ];
378    let like_field = Some(columns::COLUMN_NAME);
379    let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
380
381    query_from_information_schema_table(
382        query_engine,
383        catalog_manager,
384        query_ctx,
385        COLUMNS,
386        vec![],
387        projects,
388        filters,
389        like_field,
390        sort,
391        stmt.kind,
392    )
393    .await
394}
395
396/// Execute `SHOW INDEX` statement.
397pub async fn show_index(
398    stmt: ShowIndex,
399    query_engine: &QueryEngineRef,
400    catalog_manager: &CatalogManagerRef,
401    query_ctx: QueryContextRef,
402) -> Result<Output> {
403    let schema_name = if let Some(database) = stmt.database {
404        database
405    } else {
406        query_ctx.current_schema()
407    };
408
409    let select = vec![
410        col(key_column_usage::TABLE_NAME).alias(INDEX_TABLE_COLUMN),
411        // 1 as `Non_unique`: contain duplicates
412        lit(1).alias(INDEX_NONT_UNIQUE_COLUMN),
413        col(key_column_usage::CONSTRAINT_NAME).alias(INDEX_KEY_NAME_COLUMN),
414        col(key_column_usage::ORDINAL_POSITION).alias(INDEX_SEQ_IN_INDEX_COLUMN),
415        col(key_column_usage::COLUMN_NAME).alias(INDEX_COLUMN_NAME_COLUMN),
416        // How the column is sorted in the index: A (ascending).
417        lit("A").alias(COLUMN_COLLATION_COLUMN),
418        null().alias(INDEX_CARDINALITY_COLUMN),
419        null().alias(INDEX_SUB_PART_COLUMN),
420        null().alias(INDEX_PACKED_COLUMN),
421        // case `constraint_name`
422        //    when 'TIME INDEX' then 'NO'
423        //    else 'YES'
424        // end as `Null`
425        case(col(key_column_usage::CONSTRAINT_NAME))
426            .when(lit(TIME_INDEX), lit(NO_STR))
427            .otherwise(lit(YES_STR))
428            .context(error::PlanSqlSnafu)?
429            .alias(COLUMN_NULLABLE_COLUMN),
430        col(key_column_usage::GREPTIME_INDEX_TYPE).alias(INDEX_INDEX_TYPE_COLUMN),
431        lit("").alias(COLUMN_COMMENT_COLUMN),
432        lit("").alias(INDEX_COMMENT_COLUMN),
433        lit(YES_STR).alias(INDEX_VISIBLE_COLUMN),
434        null().alias(INDEX_EXPRESSION_COLUMN),
435    ];
436
437    let projects = vec![
438        (key_column_usage::TABLE_NAME, INDEX_TABLE_COLUMN),
439        (INDEX_NONT_UNIQUE_COLUMN, INDEX_NONT_UNIQUE_COLUMN),
440        (key_column_usage::CONSTRAINT_NAME, INDEX_KEY_NAME_COLUMN),
441        (
442            key_column_usage::ORDINAL_POSITION,
443            INDEX_SEQ_IN_INDEX_COLUMN,
444        ),
445        (key_column_usage::COLUMN_NAME, INDEX_COLUMN_NAME_COLUMN),
446        (COLUMN_COLLATION_COLUMN, COLUMN_COLLATION_COLUMN),
447        (INDEX_CARDINALITY_COLUMN, INDEX_CARDINALITY_COLUMN),
448        (INDEX_SUB_PART_COLUMN, INDEX_SUB_PART_COLUMN),
449        (INDEX_PACKED_COLUMN, INDEX_PACKED_COLUMN),
450        (COLUMN_NULLABLE_COLUMN, COLUMN_NULLABLE_COLUMN),
451        (
452            key_column_usage::GREPTIME_INDEX_TYPE,
453            INDEX_INDEX_TYPE_COLUMN,
454        ),
455        (COLUMN_COMMENT_COLUMN, COLUMN_COMMENT_COLUMN),
456        (INDEX_COMMENT_COLUMN, INDEX_COMMENT_COLUMN),
457        (INDEX_VISIBLE_COLUMN, INDEX_VISIBLE_COLUMN),
458        (INDEX_EXPRESSION_COLUMN, INDEX_EXPRESSION_COLUMN),
459    ];
460
461    let filters = vec![
462        col(key_column_usage::TABLE_NAME).eq(lit(&stmt.table)),
463        col(key_column_usage::TABLE_SCHEMA).eq(lit(schema_name.clone())),
464        col(key_column_usage::REAL_TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
465    ];
466    let like_field = None;
467    let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
468
469    query_from_information_schema_table(
470        query_engine,
471        catalog_manager,
472        query_ctx,
473        KEY_COLUMN_USAGE,
474        select,
475        projects,
476        filters,
477        like_field,
478        sort,
479        stmt.kind,
480    )
481    .await
482}
483
484/// Execute `SHOW REGION` statement.
485pub async fn show_region(
486    stmt: ShowRegion,
487    query_engine: &QueryEngineRef,
488    catalog_manager: &CatalogManagerRef,
489    query_ctx: QueryContextRef,
490) -> Result<Output> {
491    let schema_name = if let Some(database) = stmt.database {
492        database
493    } else {
494        query_ctx.current_schema()
495    };
496
497    let filters = vec![
498        col(region_peers::TABLE_NAME).eq(lit(&stmt.table)),
499        col(region_peers::TABLE_SCHEMA).eq(lit(schema_name.clone())),
500        col(region_peers::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
501    ];
502    let projects = vec![
503        (region_peers::TABLE_NAME, "Table"),
504        (region_peers::REGION_ID, "Region"),
505        (region_peers::PEER_ID, "Peer"),
506        (region_peers::IS_LEADER, "Leader"),
507    ];
508
509    let like_field = None;
510    let sort = vec![
511        col(columns::REGION_ID).sort(true, true),
512        col(columns::PEER_ID).sort(true, true),
513    ];
514
515    query_from_information_schema_table(
516        query_engine,
517        catalog_manager,
518        query_ctx,
519        REGION_PEERS,
520        vec![],
521        projects,
522        filters,
523        like_field,
524        sort,
525        stmt.kind,
526    )
527    .await
528}
529
530/// Execute [`ShowTables`] statement and return the [`Output`] if success.
531pub async fn show_tables(
532    stmt: ShowTables,
533    query_engine: &QueryEngineRef,
534    catalog_manager: &CatalogManagerRef,
535    query_ctx: QueryContextRef,
536) -> Result<Output> {
537    let schema_name = if let Some(database) = stmt.database {
538        database
539    } else {
540        query_ctx.current_schema()
541    };
542
543    // (dennis): MySQL rename `table_name` to `Tables_in_{schema}`, but we use `Tables` instead.
544    // I don't want to modify this currently, our dashboard may depend on it.
545    let projects = if stmt.full {
546        vec![
547            (tables::TABLE_NAME, TABLES_COLUMN),
548            (tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
549        ]
550    } else {
551        vec![(tables::TABLE_NAME, TABLES_COLUMN)]
552    };
553    let filters = vec![
554        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
555        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
556    ];
557    let like_field = Some(tables::TABLE_NAME);
558    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
559
560    query_from_information_schema_table(
561        query_engine,
562        catalog_manager,
563        query_ctx,
564        TABLES,
565        vec![],
566        projects,
567        filters,
568        like_field,
569        sort,
570        stmt.kind,
571    )
572    .await
573}
574
575/// Execute [`ShowTableStatus`] statement and return the [`Output`] if success.
576pub async fn show_table_status(
577    stmt: ShowTableStatus,
578    query_engine: &QueryEngineRef,
579    catalog_manager: &CatalogManagerRef,
580    query_ctx: QueryContextRef,
581) -> Result<Output> {
582    let schema_name = if let Some(database) = stmt.database {
583        database
584    } else {
585        query_ctx.current_schema()
586    };
587
588    // Refer to https://dev.mysql.com/doc/refman/8.4/en/show-table-status.html
589    let projects = vec![
590        (tables::TABLE_NAME, "Name"),
591        (tables::ENGINE, "Engine"),
592        (tables::VERSION, "Version"),
593        (tables::ROW_FORMAT, "Row_format"),
594        (tables::TABLE_ROWS, "Rows"),
595        (tables::AVG_ROW_LENGTH, "Avg_row_length"),
596        (tables::DATA_LENGTH, "Data_length"),
597        (tables::MAX_DATA_LENGTH, "Max_data_length"),
598        (tables::INDEX_LENGTH, "Index_length"),
599        (tables::DATA_FREE, "Data_free"),
600        (tables::AUTO_INCREMENT, "Auto_increment"),
601        (tables::CREATE_TIME, "Create_time"),
602        (tables::UPDATE_TIME, "Update_time"),
603        (tables::CHECK_TIME, "Check_time"),
604        (tables::TABLE_COLLATION, "Collation"),
605        (tables::CHECKSUM, "Checksum"),
606        (tables::CREATE_OPTIONS, "Create_options"),
607        (tables::TABLE_COMMENT, "Comment"),
608    ];
609
610    let filters = vec![
611        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
612        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
613    ];
614    let like_field = Some(tables::TABLE_NAME);
615    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
616
617    query_from_information_schema_table(
618        query_engine,
619        catalog_manager,
620        query_ctx,
621        TABLES,
622        vec![],
623        projects,
624        filters,
625        like_field,
626        sort,
627        stmt.kind,
628    )
629    .await
630}
631
632/// Execute `SHOW COLLATION` statement and returns the `Output` if success.
633pub async fn show_collations(
634    kind: ShowKind,
635    query_engine: &QueryEngineRef,
636    catalog_manager: &CatalogManagerRef,
637    query_ctx: QueryContextRef,
638) -> Result<Output> {
639    // Refer to https://dev.mysql.com/doc/refman/8.0/en/show-collation.html
640    let projects = vec![
641        ("collation_name", "Collation"),
642        ("character_set_name", "Charset"),
643        ("id", "Id"),
644        ("is_default", "Default"),
645        ("is_compiled", "Compiled"),
646        ("sortlen", "Sortlen"),
647    ];
648
649    let filters = vec![];
650    let like_field = Some("collation_name");
651    let sort = vec![];
652
653    query_from_information_schema_table(
654        query_engine,
655        catalog_manager,
656        query_ctx,
657        COLLATIONS,
658        vec![],
659        projects,
660        filters,
661        like_field,
662        sort,
663        kind,
664    )
665    .await
666}
667
668/// Execute `SHOW CHARSET` statement and returns the `Output` if success.
669pub async fn show_charsets(
670    kind: ShowKind,
671    query_engine: &QueryEngineRef,
672    catalog_manager: &CatalogManagerRef,
673    query_ctx: QueryContextRef,
674) -> Result<Output> {
675    // Refer to https://dev.mysql.com/doc/refman/8.0/en/show-character-set.html
676    let projects = vec![
677        ("character_set_name", "Charset"),
678        ("description", "Description"),
679        ("default_collate_name", "Default collation"),
680        ("maxlen", "Maxlen"),
681    ];
682
683    let filters = vec![];
684    let like_field = Some("character_set_name");
685    let sort = vec![];
686
687    query_from_information_schema_table(
688        query_engine,
689        catalog_manager,
690        query_ctx,
691        CHARACTER_SETS,
692        vec![],
693        projects,
694        filters,
695        like_field,
696        sort,
697        kind,
698    )
699    .await
700}
701
702pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
703    let variable = stmt.variable.to_string().to_uppercase();
704    let value = match variable.as_str() {
705        "SYSTEM_TIME_ZONE" | "SYSTEM_TIMEZONE" => get_timezone(None).to_string(),
706        "TIME_ZONE" | "TIMEZONE" => query_ctx.timezone().to_string(),
707        "READ_PREFERENCE" => query_ctx.read_preference().to_string(),
708        "DATESTYLE" => {
709            let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
710            format!("{}, {}", style, order)
711        }
712        "MAX_EXECUTION_TIME" => {
713            if query_ctx.channel() == Channel::Mysql {
714                query_ctx.query_timeout_as_millis().to_string()
715            } else {
716                return UnsupportedVariableSnafu { name: variable }.fail();
717            }
718        }
719        "STATEMENT_TIMEOUT" => {
720            // Add time units to postgres query timeout display.
721            if query_ctx.channel() == Channel::Postgres {
722                let mut timeout = query_ctx.query_timeout_as_millis().to_string();
723                timeout.push_str("ms");
724                timeout
725            } else {
726                return UnsupportedVariableSnafu { name: variable }.fail();
727            }
728        }
729        _ => return UnsupportedVariableSnafu { name: variable }.fail(),
730    };
731    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
732        variable,
733        ConcreteDataType::string_datatype(),
734        false,
735    )]));
736    let records = RecordBatches::try_from_columns(
737        schema,
738        vec![Arc::new(StringVector::from(vec![value])) as _],
739    )
740    .context(error::CreateRecordBatchSnafu)?;
741    Ok(Output::new_with_record_batches(records))
742}
743
744pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
745    let schema = Arc::new(Schema::new(vec![
746        ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), false),
747        ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
748    ]));
749    let records = RecordBatches::try_from_columns(
750        schema,
751        vec![
752            Arc::new(StringVector::from(Vec::<&str>::new())) as _,
753            Arc::new(StringVector::from(Vec::<&str>::new())) as _,
754        ],
755    )
756    .context(error::CreateRecordBatchSnafu)?;
757    Ok(Output::new_with_record_batches(records))
758}
759
760pub async fn show_search_path(_query_ctx: QueryContextRef) -> Result<Output> {
761    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
762        "search_path",
763        ConcreteDataType::string_datatype(),
764        false,
765    )]));
766    let records = RecordBatches::try_from_columns(
767        schema,
768        vec![Arc::new(StringVector::from(vec![_query_ctx.current_schema()])) as _],
769    )
770    .context(error::CreateRecordBatchSnafu)?;
771    Ok(Output::new_with_record_batches(records))
772}
773
774pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
775    let stmt = CreateDatabase {
776        name: ObjectName::from(vec![Ident::new(database_name)]),
777        if_not_exists: true,
778        options,
779    };
780    let sql = format!("{stmt}");
781    let columns = vec![
782        Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
783        Arc::new(StringVector::from(vec![sql])) as _,
784    ];
785    let records =
786        RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
787            .context(error::CreateRecordBatchSnafu)?;
788    Ok(Output::new_with_record_batches(records))
789}
790
791pub fn show_create_table(
792    table: TableRef,
793    schema_options: Option<SchemaOptions>,
794    partitions: Option<Partitions>,
795    query_ctx: QueryContextRef,
796) -> Result<Output> {
797    let table_info = table.table_info();
798    let table_name = &table_info.name;
799
800    let quote_style = query_ctx.quote_style();
801
802    let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
803    stmt.partitions = partitions.map(|mut p| {
804        p.set_quote(quote_style);
805        p
806    });
807    let sql = format!("{}", stmt);
808    let columns = vec![
809        Arc::new(StringVector::from(vec![table_name.clone()])) as _,
810        Arc::new(StringVector::from(vec![sql])) as _,
811    ];
812    let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
813        .context(error::CreateRecordBatchSnafu)?;
814
815    Ok(Output::new_with_record_batches(records))
816}
817
818pub fn show_create_foreign_table_for_pg(
819    table: TableRef,
820    _query_ctx: QueryContextRef,
821) -> Result<Output> {
822    let table_info = table.table_info();
823
824    let table_meta = &table_info.meta;
825    let table_name = &table_info.name;
826    let schema = &table_info.meta.schema;
827    let is_metric_engine = is_metric_engine(&table_meta.engine);
828
829    let columns = schema
830        .column_schemas()
831        .iter()
832        .filter_map(|c| {
833            if is_metric_engine && is_metric_engine_internal_column(&c.name) {
834                None
835            } else {
836                Some(format!(
837                    "\"{}\" {}",
838                    c.name,
839                    c.data_type.postgres_datatype_name()
840                ))
841            }
842        })
843        .join(",\n  ");
844
845    let sql = format!(
846        r#"CREATE FOREIGN TABLE ft_{} (
847  {}
848)
849SERVER greptimedb
850OPTIONS (table_name '{}')"#,
851        table_name, columns, table_name
852    );
853
854    let columns = vec![
855        Arc::new(StringVector::from(vec![table_name.clone()])) as _,
856        Arc::new(StringVector::from(vec![sql])) as _,
857    ];
858    let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
859        .context(error::CreateRecordBatchSnafu)?;
860
861    Ok(Output::new_with_record_batches(records))
862}
863
864pub fn show_create_view(
865    view_name: ObjectName,
866    definition: &str,
867    query_ctx: QueryContextRef,
868) -> Result<Output> {
869    let mut parser_ctx =
870        ParserContext::new(query_ctx.sql_dialect(), definition).context(error::SqlSnafu)?;
871
872    let Statement::CreateView(create_view) =
873        parser_ctx.parse_statement().context(error::SqlSnafu)?
874    else {
875        // MUST be `CreateView` statement.
876        unreachable!();
877    };
878
879    let stmt = CreateView {
880        name: view_name.clone(),
881        columns: create_view.columns,
882        query: create_view.query,
883        or_replace: create_view.or_replace,
884        if_not_exists: create_view.if_not_exists,
885    };
886
887    let sql = format!("{}", stmt);
888    let columns = vec![
889        Arc::new(StringVector::from(vec![view_name.to_string()])) as _,
890        Arc::new(StringVector::from(vec![sql])) as _,
891    ];
892    let records = RecordBatches::try_from_columns(SHOW_CREATE_VIEW_OUTPUT_SCHEMA.clone(), columns)
893        .context(error::CreateRecordBatchSnafu)?;
894
895    Ok(Output::new_with_record_batches(records))
896}
897
898/// Execute [`ShowViews`] statement and return the [`Output`] if success.
899pub async fn show_views(
900    stmt: ShowViews,
901    query_engine: &QueryEngineRef,
902    catalog_manager: &CatalogManagerRef,
903    query_ctx: QueryContextRef,
904) -> Result<Output> {
905    let schema_name = if let Some(database) = stmt.database {
906        database
907    } else {
908        query_ctx.current_schema()
909    };
910
911    let projects = vec![(tables::TABLE_NAME, VIEWS_COLUMN)];
912    let filters = vec![
913        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
914        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
915    ];
916    let like_field = Some(tables::TABLE_NAME);
917    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
918
919    query_from_information_schema_table(
920        query_engine,
921        catalog_manager,
922        query_ctx,
923        VIEWS,
924        vec![],
925        projects,
926        filters,
927        like_field,
928        sort,
929        stmt.kind,
930    )
931    .await
932}
933
934/// Execute [`ShowFlows`] statement and return the [`Output`] if success.
935pub async fn show_flows(
936    stmt: ShowFlows,
937    query_engine: &QueryEngineRef,
938    catalog_manager: &CatalogManagerRef,
939    query_ctx: QueryContextRef,
940) -> Result<Output> {
941    let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
942    let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
943    let like_field = Some(flows::FLOW_NAME);
944    let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
945
946    query_from_information_schema_table(
947        query_engine,
948        catalog_manager,
949        query_ctx,
950        FLOWS,
951        vec![],
952        projects,
953        filters,
954        like_field,
955        sort,
956        stmt.kind,
957    )
958    .await
959}
960
961#[cfg(feature = "enterprise")]
962pub async fn show_triggers(
963    stmt: sql::statements::show::trigger::ShowTriggers,
964    query_engine: &QueryEngineRef,
965    catalog_manager: &CatalogManagerRef,
966    query_ctx: QueryContextRef,
967) -> Result<Output> {
968    const TRIGGER_NAME: &str = "trigger_name";
969    const TRIGGERS_COLUMN: &str = "Triggers";
970
971    let projects = vec![(TRIGGER_NAME, TRIGGERS_COLUMN)];
972    let like_field = Some(TRIGGER_NAME);
973    let sort = vec![col(TRIGGER_NAME).sort(true, true)];
974
975    query_from_information_schema_table(
976        query_engine,
977        catalog_manager,
978        query_ctx,
979        catalog::information_schema::TRIGGERS,
980        vec![],
981        projects,
982        vec![],
983        like_field,
984        sort,
985        stmt.kind,
986    )
987    .await
988}
989
990pub fn show_create_flow(
991    flow_name: ObjectName,
992    flow_val: FlowInfoValue,
993    query_ctx: QueryContextRef,
994) -> Result<Output> {
995    let mut parser_ctx =
996        ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
997
998    let query = parser_ctx.parse_statement().context(error::SqlSnafu)?;
999
1000    // since prom ql will parse `now()` to a fixed time, we need to not use it for generating raw query
1001    let raw_query = match &query {
1002        Statement::Tql(_) => flow_val.raw_sql().clone(),
1003        _ => query.to_string(),
1004    };
1005
1006    let query = Box::new(SqlOrTql::try_from_statement(query, &raw_query).context(error::SqlSnafu)?);
1007
1008    let comment = if flow_val.comment().is_empty() {
1009        None
1010    } else {
1011        Some(flow_val.comment().clone())
1012    };
1013
1014    let stmt = CreateFlow {
1015        flow_name,
1016        sink_table_name: ObjectName::from(vec![Ident::new(&flow_val.sink_table_name().table_name)]),
1017        // notice we don't want `OR REPLACE` and `IF NOT EXISTS` in same sql since it's unclear what to do
1018        // so we set `or_replace` to false.
1019        or_replace: false,
1020        if_not_exists: true,
1021        expire_after: flow_val.expire_after(),
1022        comment,
1023        query,
1024    };
1025
1026    let sql = format!("{}", stmt);
1027    let columns = vec![
1028        Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
1029        Arc::new(StringVector::from(vec![sql])) as _,
1030    ];
1031    let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
1032        .context(error::CreateRecordBatchSnafu)?;
1033
1034    Ok(Output::new_with_record_batches(records))
1035}
1036
1037pub fn describe_table(table: TableRef) -> Result<Output> {
1038    let table_info = table.table_info();
1039    let columns_schemas = table_info.meta.schema.column_schemas();
1040    let columns = vec![
1041        describe_column_names(columns_schemas),
1042        describe_column_types(columns_schemas),
1043        describe_column_keys(columns_schemas, &table_info.meta.primary_key_indices),
1044        describe_column_nullables(columns_schemas),
1045        describe_column_defaults(columns_schemas),
1046        describe_column_semantic_types(columns_schemas, &table_info.meta.primary_key_indices),
1047    ];
1048    let records = RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), columns)
1049        .context(error::CreateRecordBatchSnafu)?;
1050    Ok(Output::new_with_record_batches(records))
1051}
1052
1053fn describe_column_names(columns_schemas: &[ColumnSchema]) -> VectorRef {
1054    Arc::new(StringVector::from_iterator(
1055        columns_schemas.iter().map(|cs| cs.name.as_str()),
1056    ))
1057}
1058
1059fn describe_column_types(columns_schemas: &[ColumnSchema]) -> VectorRef {
1060    Arc::new(StringVector::from(
1061        columns_schemas
1062            .iter()
1063            .map(|cs| cs.data_type.name())
1064            .collect::<Vec<_>>(),
1065    ))
1066}
1067
1068fn describe_column_keys(
1069    columns_schemas: &[ColumnSchema],
1070    primary_key_indices: &[usize],
1071) -> VectorRef {
1072    Arc::new(StringVector::from_iterator(
1073        columns_schemas.iter().enumerate().map(|(i, cs)| {
1074            if cs.is_time_index() || primary_key_indices.contains(&i) {
1075                PRI_KEY
1076            } else {
1077                ""
1078            }
1079        }),
1080    ))
1081}
1082
1083fn describe_column_nullables(columns_schemas: &[ColumnSchema]) -> VectorRef {
1084    Arc::new(StringVector::from_iterator(columns_schemas.iter().map(
1085        |cs| {
1086            if cs.is_nullable() {
1087                YES_STR
1088            } else {
1089                NO_STR
1090            }
1091        },
1092    )))
1093}
1094
1095fn describe_column_defaults(columns_schemas: &[ColumnSchema]) -> VectorRef {
1096    Arc::new(StringVector::from(
1097        columns_schemas
1098            .iter()
1099            .map(|cs| {
1100                cs.default_constraint()
1101                    .map_or(String::from(""), |dc| dc.to_string())
1102            })
1103            .collect::<Vec<String>>(),
1104    ))
1105}
1106
1107fn describe_column_semantic_types(
1108    columns_schemas: &[ColumnSchema],
1109    primary_key_indices: &[usize],
1110) -> VectorRef {
1111    Arc::new(StringVector::from_iterator(
1112        columns_schemas.iter().enumerate().map(|(i, cs)| {
1113            if primary_key_indices.contains(&i) {
1114                SEMANTIC_TYPE_PRIMARY_KEY
1115            } else if cs.is_time_index() {
1116                SEMANTIC_TYPE_TIME_INDEX
1117            } else {
1118                SEMANTIC_TYPE_FIELD
1119            }
1120        }),
1121    ))
1122}
1123
1124// lists files in the frontend to reduce unnecessary scan requests repeated in each datanode.
1125pub async fn prepare_file_table_files(
1126    options: &HashMap<String, String>,
1127) -> Result<(ObjectStore, Vec<String>)> {
1128    let url = options
1129        .get(FILE_TABLE_LOCATION_KEY)
1130        .context(error::MissingRequiredFieldSnafu {
1131            name: FILE_TABLE_LOCATION_KEY,
1132        })?;
1133
1134    let (dir, filename) = find_dir_and_filename(url);
1135    let source = if let Some(filename) = filename {
1136        Source::Filename(filename)
1137    } else {
1138        Source::Dir
1139    };
1140    let regex = options
1141        .get(FILE_TABLE_PATTERN_KEY)
1142        .map(|x| Regex::new(x))
1143        .transpose()
1144        .context(error::BuildRegexSnafu)?;
1145    let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
1146    let lister = Lister::new(object_store.clone(), source, dir, regex);
1147    // If we scan files in a directory every time the database restarts,
1148    // then it might lead to a potential undefined behavior:
1149    // If a user adds a file with an incompatible schema to that directory,
1150    // it will make the external table unavailable.
1151    let files = lister
1152        .list()
1153        .await
1154        .context(error::ListObjectsSnafu)?
1155        .into_iter()
1156        .filter_map(|entry| {
1157            if entry.path().ends_with('/') {
1158                None
1159            } else {
1160                Some(entry.path().to_string())
1161            }
1162        })
1163        .collect::<Vec<_>>();
1164    Ok((object_store, files))
1165}
1166
1167pub async fn infer_file_table_schema(
1168    object_store: &ObjectStore,
1169    files: &[String],
1170    options: &HashMap<String, String>,
1171) -> Result<RawSchema> {
1172    let format = parse_file_table_format(options)?;
1173    let merged = infer_schemas(object_store, files, format.as_ref())
1174        .await
1175        .context(error::InferSchemaSnafu)?;
1176    Ok(RawSchema::from(
1177        &Schema::try_from(merged).context(error::ConvertSchemaSnafu)?,
1178    ))
1179}
1180
1181// Converts the file column schemas to table column schemas.
1182// Returns the column schemas and the time index column name.
1183//
1184// More specifically, this function will do the following:
1185// 1. Add a default time index column if there is no time index column
1186//    in the file column schemas, or
1187// 2. If the file column schemas contain a column with name conflicts with
1188//    the default time index column, it will replace the column schema
1189//    with the default one.
1190pub fn file_column_schemas_to_table(
1191    file_column_schemas: &[ColumnSchema],
1192) -> (Vec<ColumnSchema>, String) {
1193    let mut column_schemas = file_column_schemas.to_owned();
1194    if let Some(time_index_column) = column_schemas.iter().find(|c| c.is_time_index()) {
1195        let time_index = time_index_column.name.clone();
1196        return (column_schemas, time_index);
1197    }
1198
1199    let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
1200    let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
1201    let timestamp_column_schema = ColumnSchema::new(GREPTIME_TIMESTAMP, timestamp_type, false)
1202        .with_time_index(true)
1203        .with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
1204        .unwrap();
1205
1206    if let Some(column_schema) = column_schemas
1207        .iter_mut()
1208        .find(|column_schema| column_schema.name == GREPTIME_TIMESTAMP)
1209    {
1210        // Replace the column schema with the default one
1211        *column_schema = timestamp_column_schema;
1212    } else {
1213        column_schemas.push(timestamp_column_schema);
1214    }
1215
1216    (column_schemas, GREPTIME_TIMESTAMP.to_string())
1217}
1218
1219/// This function checks if the column schemas from a file can be matched with
1220/// the column schemas of a table.
1221///
1222/// More specifically, for each column seen in the table schema,
1223/// - If the same column does exist in the file schema, it checks if the data
1224///   type of the file column can be casted into the form of the table column.
1225/// - If the same column does not exist in the file schema, it checks if the
1226///   table column is nullable or has a default constraint.
1227pub fn check_file_to_table_schema_compatibility(
1228    file_column_schemas: &[ColumnSchema],
1229    table_column_schemas: &[ColumnSchema],
1230) -> Result<()> {
1231    let file_schemas_map = file_column_schemas
1232        .iter()
1233        .map(|s| (s.name.clone(), s))
1234        .collect::<HashMap<_, _>>();
1235
1236    for table_column in table_column_schemas {
1237        if let Some(file_column) = file_schemas_map.get(&table_column.name) {
1238            // TODO(zhongzc): a temporary solution, we should use `can_cast_to` once it's ready.
1239            ensure!(
1240                file_column
1241                    .data_type
1242                    .can_arrow_type_cast_to(&table_column.data_type),
1243                error::ColumnSchemaIncompatibleSnafu {
1244                    column: table_column.name.clone(),
1245                    file_type: file_column.data_type.clone(),
1246                    table_type: table_column.data_type.clone(),
1247                }
1248            );
1249        } else {
1250            ensure!(
1251                table_column.is_nullable() || table_column.default_constraint().is_some(),
1252                error::ColumnSchemaNoDefaultSnafu {
1253                    column: table_column.name.clone(),
1254                }
1255            );
1256        }
1257    }
1258
1259    Ok(())
1260}
1261
1262fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn FileFormat>> {
1263    Ok(
1264        match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
1265            Format::Csv(format) => Box::new(format),
1266            Format::Json(format) => Box::new(format),
1267            Format::Parquet(format) => Box::new(format),
1268            Format::Orc(format) => Box::new(format),
1269        },
1270    )
1271}
1272
1273pub async fn show_processlist(
1274    stmt: ShowProcessList,
1275    query_engine: &QueryEngineRef,
1276    catalog_manager: &CatalogManagerRef,
1277    query_ctx: QueryContextRef,
1278) -> Result<Output> {
1279    let projects = if stmt.full {
1280        vec![
1281            (process_list::ID, "Id"),
1282            (process_list::CATALOG, "Catalog"),
1283            (process_list::SCHEMAS, "Schema"),
1284            (process_list::CLIENT, "Client"),
1285            (process_list::FRONTEND, "Frontend"),
1286            (process_list::START_TIMESTAMP, "Start Time"),
1287            (process_list::ELAPSED_TIME, "Elapsed Time"),
1288            (process_list::QUERY, "Query"),
1289        ]
1290    } else {
1291        vec![
1292            (process_list::ID, "Id"),
1293            (process_list::CATALOG, "Catalog"),
1294            (process_list::QUERY, "Query"),
1295            (process_list::ELAPSED_TIME, "Elapsed Time"),
1296        ]
1297    };
1298
1299    let filters = vec![];
1300    let like_field = None;
1301    let sort = vec![col("id").sort(true, true)];
1302    query_from_information_schema_table(
1303        query_engine,
1304        catalog_manager,
1305        query_ctx.clone(),
1306        "process_list",
1307        vec![],
1308        projects.clone(),
1309        filters,
1310        like_field,
1311        sort,
1312        ShowKind::All,
1313    )
1314    .await
1315}
1316
1317#[cfg(test)]
1318mod test {
1319    use std::sync::Arc;
1320
1321    use common_query::{Output, OutputData};
1322    use common_recordbatch::{RecordBatch, RecordBatches};
1323    use common_time::timestamp::TimeUnit;
1324    use common_time::Timezone;
1325    use datatypes::prelude::ConcreteDataType;
1326    use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
1327    use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
1328    use session::context::QueryContextBuilder;
1329    use snafu::ResultExt;
1330    use sql::ast::{Ident, ObjectName};
1331    use sql::statements::show::ShowVariables;
1332    use table::test_util::MemTable;
1333    use table::TableRef;
1334
1335    use super::show_variable;
1336    use crate::error;
1337    use crate::error::Result;
1338    use crate::sql::{
1339        describe_table, DESCRIBE_TABLE_OUTPUT_SCHEMA, NO_STR, SEMANTIC_TYPE_FIELD,
1340        SEMANTIC_TYPE_TIME_INDEX, YES_STR,
1341    };
1342
1343    #[test]
1344    fn test_describe_table_multiple_columns() -> Result<()> {
1345        let table_name = "test_table";
1346        let schema = vec![
1347            ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
1348            ColumnSchema::new(
1349                "t2",
1350                ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
1351                false,
1352            )
1353            .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
1354                "current_timestamp()",
1355            ))))
1356            .unwrap()
1357            .with_time_index(true),
1358        ];
1359        let data = vec![
1360            Arc::new(UInt32Vector::from_slice([0])) as _,
1361            Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
1362        ];
1363        let expected_columns = vec![
1364            Arc::new(StringVector::from(vec!["t1", "t2"])) as _,
1365            Arc::new(StringVector::from(vec!["UInt32", "TimestampMillisecond"])) as _,
1366            Arc::new(StringVector::from(vec!["", "PRI"])) as _,
1367            Arc::new(StringVector::from(vec![YES_STR, NO_STR])) as _,
1368            Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
1369            Arc::new(StringVector::from(vec![
1370                SEMANTIC_TYPE_FIELD,
1371                SEMANTIC_TYPE_TIME_INDEX,
1372            ])) as _,
1373        ];
1374
1375        describe_table_test_by_schema(table_name, schema, data, expected_columns)
1376    }
1377
1378    fn describe_table_test_by_schema(
1379        table_name: &str,
1380        schema: Vec<ColumnSchema>,
1381        data: Vec<VectorRef>,
1382        expected_columns: Vec<VectorRef>,
1383    ) -> Result<()> {
1384        let table_schema = SchemaRef::new(Schema::new(schema));
1385        let table = prepare_describe_table(table_name, table_schema, data);
1386
1387        let expected =
1388            RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
1389                .context(error::CreateRecordBatchSnafu)?;
1390
1391        if let OutputData::RecordBatches(res) = describe_table(table)?.data {
1392            assert_eq!(res.take(), expected.take());
1393        } else {
1394            panic!("describe table must return record batch");
1395        }
1396
1397        Ok(())
1398    }
1399
1400    fn prepare_describe_table(
1401        table_name: &str,
1402        table_schema: SchemaRef,
1403        data: Vec<VectorRef>,
1404    ) -> TableRef {
1405        let record_batch = RecordBatch::new(table_schema, data).unwrap();
1406        MemTable::table(table_name, record_batch)
1407    }
1408
1409    #[test]
1410    fn test_show_variable() {
1411        assert_eq!(
1412            exec_show_variable("SYSTEM_TIME_ZONE", "Asia/Shanghai").unwrap(),
1413            "UTC"
1414        );
1415        assert_eq!(
1416            exec_show_variable("SYSTEM_TIMEZONE", "Asia/Shanghai").unwrap(),
1417            "UTC"
1418        );
1419        assert_eq!(
1420            exec_show_variable("TIME_ZONE", "Asia/Shanghai").unwrap(),
1421            "Asia/Shanghai"
1422        );
1423        assert_eq!(
1424            exec_show_variable("TIMEZONE", "Asia/Shanghai").unwrap(),
1425            "Asia/Shanghai"
1426        );
1427        assert!(exec_show_variable("TIME ZONE", "Asia/Shanghai").is_err());
1428        assert!(exec_show_variable("SYSTEM TIME ZONE", "Asia/Shanghai").is_err());
1429    }
1430
1431    fn exec_show_variable(variable: &str, tz: &str) -> Result<String> {
1432        let stmt = ShowVariables {
1433            variable: ObjectName::from(vec![Ident::new(variable)]),
1434        };
1435        let ctx = Arc::new(
1436            QueryContextBuilder::default()
1437                .timezone(Timezone::from_tz_string(tz).unwrap())
1438                .build(),
1439        );
1440        match show_variable(stmt, ctx) {
1441            Ok(Output {
1442                data: OutputData::RecordBatches(record),
1443                ..
1444            }) => {
1445                let record = record.take().first().cloned().unwrap();
1446                let data = record.column(0);
1447                Ok(data.get(0).to_string())
1448            }
1449            Ok(_) => unreachable!(),
1450            Err(e) => Err(e),
1451        }
1452    }
1453}