query/
sql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod show_create_table;
16
17use std::collections::HashMap;
18use std::ops::ControlFlow;
19use std::sync::Arc;
20
21use catalog::CatalogManagerRef;
22use catalog::information_schema::{
23    CHARACTER_SETS, COLLATIONS, COLUMNS, FLOWS, KEY_COLUMN_USAGE, REGION_PEERS, SCHEMATA, TABLES,
24    VIEWS, columns, flows, key_column_usage, process_list, region_peers, schemata, tables,
25};
26use common_catalog::consts::{
27    INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
28    SEMANTIC_TYPE_TIME_INDEX,
29};
30use common_catalog::format_full_table_name;
31use common_datasource::file_format::{FileFormat, Format, infer_schemas};
32use common_datasource::lister::{Lister, Source};
33use common_datasource::object_store::build_backend;
34use common_datasource::util::find_dir_and_filename;
35use common_meta::SchemaOptions;
36use common_meta::key::flow::flow_info::FlowInfoValue;
37use common_query::Output;
38use common_query::prelude::greptime_timestamp;
39use common_recordbatch::RecordBatches;
40use common_recordbatch::adapter::RecordBatchStreamAdapter;
41use common_time::Timestamp;
42use common_time::timezone::get_timezone;
43use datafusion::common::ScalarValue;
44use datafusion::prelude::SessionContext;
45use datafusion_expr::{Expr, SortExpr, case, col, lit};
46use datatypes::prelude::*;
47use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
48use datatypes::vectors::StringVector;
49use itertools::Itertools;
50use object_store::ObjectStore;
51use once_cell::sync::Lazy;
52use regex::Regex;
53use session::context::{Channel, QueryContextRef};
54pub use show_create_table::create_table_stmt;
55use snafu::{OptionExt, ResultExt, ensure};
56use sql::ast::{Ident, visit_expressions_mut};
57use sql::parser::ParserContext;
58use sql::statements::OptionMap;
59use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions, SqlOrTql};
60use sql::statements::show::{
61    ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowProcessList, ShowRegion,
62    ShowTableStatus, ShowTables, ShowVariables, ShowViews,
63};
64use sql::statements::statement::Statement;
65use sqlparser::ast::ObjectName;
66use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
67use table::TableRef;
68use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
69
70use crate::QueryEngineRef;
71use crate::error::{self, Result, UnsupportedVariableSnafu};
72use crate::planner::DfLogicalPlanner;
73
74const SCHEMAS_COLUMN: &str = "Database";
75const OPTIONS_COLUMN: &str = "Options";
76const VIEWS_COLUMN: &str = "Views";
77const FLOWS_COLUMN: &str = "Flows";
78const FIELD_COLUMN: &str = "Field";
79const TABLE_TYPE_COLUMN: &str = "Table_type";
80const COLUMN_NAME_COLUMN: &str = "Column";
81const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
82const COLUMN_TYPE_COLUMN: &str = "Type";
83const COLUMN_KEY_COLUMN: &str = "Key";
84const COLUMN_EXTRA_COLUMN: &str = "Extra";
85const COLUMN_PRIVILEGES_COLUMN: &str = "Privileges";
86const COLUMN_COLLATION_COLUMN: &str = "Collation";
87const COLUMN_NULLABLE_COLUMN: &str = "Null";
88const COLUMN_DEFAULT_COLUMN: &str = "Default";
89const COLUMN_COMMENT_COLUMN: &str = "Comment";
90const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
91
92const YES_STR: &str = "YES";
93const NO_STR: &str = "NO";
94const PRI_KEY: &str = "PRI";
95const TIME_INDEX: &str = "TIME INDEX";
96
97/// SHOW index columns
98const INDEX_TABLE_COLUMN: &str = "Table";
99const INDEX_NONT_UNIQUE_COLUMN: &str = "Non_unique";
100const INDEX_CARDINALITY_COLUMN: &str = "Cardinality";
101const INDEX_SUB_PART_COLUMN: &str = "Sub_part";
102const INDEX_PACKED_COLUMN: &str = "Packed";
103const INDEX_INDEX_TYPE_COLUMN: &str = "Index_type";
104const INDEX_COMMENT_COLUMN: &str = "Index_comment";
105const INDEX_VISIBLE_COLUMN: &str = "Visible";
106const INDEX_EXPRESSION_COLUMN: &str = "Expression";
107const INDEX_KEY_NAME_COLUMN: &str = "Key_name";
108const INDEX_SEQ_IN_INDEX_COLUMN: &str = "Seq_in_index";
109const INDEX_COLUMN_NAME_COLUMN: &str = "Column_name";
110
111static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
112    Arc::new(Schema::new(vec![
113        ColumnSchema::new(
114            COLUMN_NAME_COLUMN,
115            ConcreteDataType::string_datatype(),
116            false,
117        ),
118        ColumnSchema::new(
119            COLUMN_TYPE_COLUMN,
120            ConcreteDataType::string_datatype(),
121            false,
122        ),
123        ColumnSchema::new(COLUMN_KEY_COLUMN, ConcreteDataType::string_datatype(), true),
124        ColumnSchema::new(
125            COLUMN_NULLABLE_COLUMN,
126            ConcreteDataType::string_datatype(),
127            false,
128        ),
129        ColumnSchema::new(
130            COLUMN_DEFAULT_COLUMN,
131            ConcreteDataType::string_datatype(),
132            false,
133        ),
134        ColumnSchema::new(
135            COLUMN_SEMANTIC_TYPE_COLUMN,
136            ConcreteDataType::string_datatype(),
137            false,
138        ),
139    ]))
140});
141
142static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
143    Arc::new(Schema::new(vec![
144        ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
145        ColumnSchema::new(
146            "Create Database",
147            ConcreteDataType::string_datatype(),
148            false,
149        ),
150    ]))
151});
152
153static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
154    Arc::new(Schema::new(vec![
155        ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
156        ColumnSchema::new("Create Table", ConcreteDataType::string_datatype(), false),
157    ]))
158});
159
160static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
161    Arc::new(Schema::new(vec![
162        ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
163        ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
164    ]))
165});
166
167static SHOW_CREATE_VIEW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
168    Arc::new(Schema::new(vec![
169        ColumnSchema::new("View", ConcreteDataType::string_datatype(), false),
170        ColumnSchema::new("Create View", ConcreteDataType::string_datatype(), false),
171    ]))
172});
173
174fn null() -> Expr {
175    lit(ScalarValue::Null)
176}
177
178pub async fn show_databases(
179    stmt: ShowDatabases,
180    query_engine: &QueryEngineRef,
181    catalog_manager: &CatalogManagerRef,
182    query_ctx: QueryContextRef,
183) -> Result<Output> {
184    let projects = if stmt.full {
185        vec![
186            (schemata::SCHEMA_NAME, SCHEMAS_COLUMN),
187            (schemata::SCHEMA_OPTS, OPTIONS_COLUMN),
188        ]
189    } else {
190        vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)]
191    };
192
193    let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
194    let like_field = Some(schemata::SCHEMA_NAME);
195    let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
196
197    query_from_information_schema_table(
198        query_engine,
199        catalog_manager,
200        query_ctx,
201        SCHEMATA,
202        vec![],
203        projects,
204        filters,
205        like_field,
206        sort,
207        stmt.kind,
208    )
209    .await
210}
211
212/// Replaces column identifier references in a SQL expression.
213/// Used for backward compatibility where old column names should work with new ones.
214fn replace_column_in_expr(expr: &mut sqlparser::ast::Expr, from_column: &str, to_column: &str) {
215    let _ = visit_expressions_mut(expr, |e| {
216        match e {
217            sqlparser::ast::Expr::Identifier(ident) => {
218                if ident.value.eq_ignore_ascii_case(from_column) {
219                    ident.value = to_column.to_string();
220                }
221            }
222            sqlparser::ast::Expr::CompoundIdentifier(idents) => {
223                if let Some(last) = idents.last_mut()
224                    && last.value.eq_ignore_ascii_case(from_column)
225                {
226                    last.value = to_column.to_string();
227                }
228            }
229            _ => {}
230        }
231        ControlFlow::<()>::Continue(())
232    });
233}
234
235/// Cast a `show` statement execution into a query from tables in  `information_schema`.
236/// - `table_name`: the table name in `information_schema`,
237/// - `projects`: query projection, a list of `(column, renamed_column)`,
238/// - `filters`: filter expressions for query,
239/// - `like_field`: the field to filter by the predicate `ShowKind::Like`,
240/// - `sort`: sort the results by the specified sorting expressions,
241/// - `kind`: the show kind
242#[allow(clippy::too_many_arguments)]
243async fn query_from_information_schema_table(
244    query_engine: &QueryEngineRef,
245    catalog_manager: &CatalogManagerRef,
246    query_ctx: QueryContextRef,
247    table_name: &str,
248    select: Vec<Expr>,
249    projects: Vec<(&str, &str)>,
250    filters: Vec<Expr>,
251    like_field: Option<&str>,
252    sort: Vec<SortExpr>,
253    kind: ShowKind,
254) -> Result<Output> {
255    let table = catalog_manager
256        .table(
257            query_ctx.current_catalog(),
258            INFORMATION_SCHEMA_NAME,
259            table_name,
260            Some(&query_ctx),
261        )
262        .await
263        .context(error::CatalogSnafu)?
264        .with_context(|| error::TableNotFoundSnafu {
265            table: format_full_table_name(
266                query_ctx.current_catalog(),
267                INFORMATION_SCHEMA_NAME,
268                table_name,
269            ),
270        })?;
271
272    let dataframe = query_engine.read_table(table)?;
273
274    // Apply filters
275    let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
276        df.filter(expr).context(error::PlanSqlSnafu)
277    })?;
278
279    // Apply `like` predicate if exists
280    let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
281        dataframe
282            .filter(col(field).like(lit(ident.value.clone())))
283            .context(error::PlanSqlSnafu)?
284    } else {
285        dataframe
286    };
287
288    // Apply sorting
289    let dataframe = if sort.is_empty() {
290        dataframe
291    } else {
292        dataframe.sort(sort).context(error::PlanSqlSnafu)?
293    };
294
295    // Apply select
296    let dataframe = if select.is_empty() {
297        if projects.is_empty() {
298            dataframe
299        } else {
300            let projection = projects
301                .iter()
302                .map(|x| col(x.0).alias(x.1))
303                .collect::<Vec<_>>();
304            dataframe.select(projection).context(error::PlanSqlSnafu)?
305        }
306    } else {
307        dataframe.select(select).context(error::PlanSqlSnafu)?
308    };
309
310    // Apply projection
311    let dataframe = projects
312        .into_iter()
313        .try_fold(dataframe, |df, (column, renamed_column)| {
314            df.with_column_renamed(column, renamed_column)
315                .context(error::PlanSqlSnafu)
316        })?;
317
318    let dataframe = match kind {
319        ShowKind::All | ShowKind::Like(_) => {
320            // Like kind is processed above
321            dataframe
322        }
323        ShowKind::Where(filter) => {
324            // Cast the results into VIEW for `where` clause,
325            // which is evaluated against the column names displayed by the SHOW statement.
326            let view = dataframe.into_view();
327            let dataframe = SessionContext::new_with_state(
328                query_engine
329                    .engine_context(query_ctx.clone())
330                    .state()
331                    .clone(),
332            )
333            .read_table(view)?;
334
335            let planner = query_engine.planner();
336            let planner = planner
337                .as_any()
338                .downcast_ref::<DfLogicalPlanner>()
339                .expect("Must be the datafusion planner");
340
341            let filter = planner
342                .sql_to_expr(filter, dataframe.schema(), false, query_ctx)
343                .await?;
344
345            // Apply the `where` clause filters
346            dataframe.filter(filter).context(error::PlanSqlSnafu)?
347        }
348    };
349
350    let stream = dataframe.execute_stream().await?;
351
352    Ok(Output::new_with_stream(Box::pin(
353        RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
354    )))
355}
356
357/// Execute `SHOW COLUMNS` statement.
358pub async fn show_columns(
359    stmt: ShowColumns,
360    query_engine: &QueryEngineRef,
361    catalog_manager: &CatalogManagerRef,
362    query_ctx: QueryContextRef,
363) -> Result<Output> {
364    let schema_name = if let Some(database) = stmt.database {
365        database
366    } else {
367        query_ctx.current_schema()
368    };
369
370    let projects = if stmt.full {
371        vec![
372            (columns::COLUMN_NAME, FIELD_COLUMN),
373            (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
374            (columns::COLLATION_NAME, COLUMN_COLLATION_COLUMN),
375            (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
376            (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
377            (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
378            (columns::COLUMN_COMMENT, COLUMN_COMMENT_COLUMN),
379            (columns::PRIVILEGES, COLUMN_PRIVILEGES_COLUMN),
380            (columns::EXTRA, COLUMN_EXTRA_COLUMN),
381            (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
382        ]
383    } else {
384        vec![
385            (columns::COLUMN_NAME, FIELD_COLUMN),
386            (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
387            (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
388            (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
389            (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
390            (columns::EXTRA, COLUMN_EXTRA_COLUMN),
391            (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
392        ]
393    };
394
395    let filters = vec![
396        col(columns::TABLE_NAME).eq(lit(&stmt.table)),
397        col(columns::TABLE_SCHEMA).eq(lit(schema_name.clone())),
398        col(columns::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
399    ];
400    let like_field = Some(columns::COLUMN_NAME);
401    let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
402
403    query_from_information_schema_table(
404        query_engine,
405        catalog_manager,
406        query_ctx,
407        COLUMNS,
408        vec![],
409        projects,
410        filters,
411        like_field,
412        sort,
413        stmt.kind,
414    )
415    .await
416}
417
418/// Execute `SHOW INDEX` statement.
419pub async fn show_index(
420    stmt: ShowIndex,
421    query_engine: &QueryEngineRef,
422    catalog_manager: &CatalogManagerRef,
423    query_ctx: QueryContextRef,
424) -> Result<Output> {
425    let schema_name = if let Some(database) = stmt.database {
426        database
427    } else {
428        query_ctx.current_schema()
429    };
430
431    let select = vec![
432        col(key_column_usage::TABLE_NAME).alias(INDEX_TABLE_COLUMN),
433        // 1 as `Non_unique`: contain duplicates
434        lit(1).alias(INDEX_NONT_UNIQUE_COLUMN),
435        col(key_column_usage::CONSTRAINT_NAME).alias(INDEX_KEY_NAME_COLUMN),
436        col(key_column_usage::ORDINAL_POSITION).alias(INDEX_SEQ_IN_INDEX_COLUMN),
437        col(key_column_usage::COLUMN_NAME).alias(INDEX_COLUMN_NAME_COLUMN),
438        // How the column is sorted in the index: A (ascending).
439        lit("A").alias(COLUMN_COLLATION_COLUMN),
440        null().alias(INDEX_CARDINALITY_COLUMN),
441        null().alias(INDEX_SUB_PART_COLUMN),
442        null().alias(INDEX_PACKED_COLUMN),
443        // case `constraint_name`
444        //    when 'TIME INDEX' then 'NO'
445        //    else 'YES'
446        // end as `Null`
447        case(col(key_column_usage::CONSTRAINT_NAME))
448            .when(lit(TIME_INDEX), lit(NO_STR))
449            .otherwise(lit(YES_STR))
450            .context(error::PlanSqlSnafu)?
451            .alias(COLUMN_NULLABLE_COLUMN),
452        col(key_column_usage::GREPTIME_INDEX_TYPE).alias(INDEX_INDEX_TYPE_COLUMN),
453        lit("").alias(COLUMN_COMMENT_COLUMN),
454        lit("").alias(INDEX_COMMENT_COLUMN),
455        lit(YES_STR).alias(INDEX_VISIBLE_COLUMN),
456        null().alias(INDEX_EXPRESSION_COLUMN),
457    ];
458
459    let projects = vec![
460        (key_column_usage::TABLE_NAME, INDEX_TABLE_COLUMN),
461        (INDEX_NONT_UNIQUE_COLUMN, INDEX_NONT_UNIQUE_COLUMN),
462        (key_column_usage::CONSTRAINT_NAME, INDEX_KEY_NAME_COLUMN),
463        (
464            key_column_usage::ORDINAL_POSITION,
465            INDEX_SEQ_IN_INDEX_COLUMN,
466        ),
467        (key_column_usage::COLUMN_NAME, INDEX_COLUMN_NAME_COLUMN),
468        (COLUMN_COLLATION_COLUMN, COLUMN_COLLATION_COLUMN),
469        (INDEX_CARDINALITY_COLUMN, INDEX_CARDINALITY_COLUMN),
470        (INDEX_SUB_PART_COLUMN, INDEX_SUB_PART_COLUMN),
471        (INDEX_PACKED_COLUMN, INDEX_PACKED_COLUMN),
472        (COLUMN_NULLABLE_COLUMN, COLUMN_NULLABLE_COLUMN),
473        (
474            key_column_usage::GREPTIME_INDEX_TYPE,
475            INDEX_INDEX_TYPE_COLUMN,
476        ),
477        (COLUMN_COMMENT_COLUMN, COLUMN_COMMENT_COLUMN),
478        (INDEX_COMMENT_COLUMN, INDEX_COMMENT_COLUMN),
479        (INDEX_VISIBLE_COLUMN, INDEX_VISIBLE_COLUMN),
480        (INDEX_EXPRESSION_COLUMN, INDEX_EXPRESSION_COLUMN),
481    ];
482
483    let filters = vec![
484        col(key_column_usage::TABLE_NAME).eq(lit(&stmt.table)),
485        col(key_column_usage::TABLE_SCHEMA).eq(lit(schema_name.clone())),
486        col(key_column_usage::REAL_TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
487    ];
488    let like_field = None;
489    let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
490
491    query_from_information_schema_table(
492        query_engine,
493        catalog_manager,
494        query_ctx,
495        KEY_COLUMN_USAGE,
496        select,
497        projects,
498        filters,
499        like_field,
500        sort,
501        stmt.kind,
502    )
503    .await
504}
505
506/// Execute `SHOW REGION` statement.
507pub async fn show_region(
508    stmt: ShowRegion,
509    query_engine: &QueryEngineRef,
510    catalog_manager: &CatalogManagerRef,
511    query_ctx: QueryContextRef,
512) -> Result<Output> {
513    let schema_name = if let Some(database) = stmt.database {
514        database
515    } else {
516        query_ctx.current_schema()
517    };
518
519    let filters = vec![
520        col(region_peers::TABLE_NAME).eq(lit(&stmt.table)),
521        col(region_peers::TABLE_SCHEMA).eq(lit(schema_name.clone())),
522        col(region_peers::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
523    ];
524    let projects = vec![
525        (region_peers::TABLE_NAME, "Table"),
526        (region_peers::REGION_ID, "Region"),
527        (region_peers::PEER_ID, "Peer"),
528        (region_peers::IS_LEADER, "Leader"),
529    ];
530
531    let like_field = None;
532    let sort = vec![
533        col(columns::REGION_ID).sort(true, true),
534        col(columns::PEER_ID).sort(true, true),
535    ];
536
537    query_from_information_schema_table(
538        query_engine,
539        catalog_manager,
540        query_ctx,
541        REGION_PEERS,
542        vec![],
543        projects,
544        filters,
545        like_field,
546        sort,
547        stmt.kind,
548    )
549    .await
550}
551
552/// Execute [`ShowTables`] statement and return the [`Output`] if success.
553pub async fn show_tables(
554    stmt: ShowTables,
555    query_engine: &QueryEngineRef,
556    catalog_manager: &CatalogManagerRef,
557    query_ctx: QueryContextRef,
558) -> Result<Output> {
559    let schema_name = if let Some(database) = stmt.database {
560        database
561    } else {
562        query_ctx.current_schema()
563    };
564
565    // MySQL renames `table_name` to `Tables_in_{schema}` for protocol compatibility
566    let tables_column = format!("Tables_in_{}", schema_name);
567    let projects = if stmt.full {
568        vec![
569            (tables::TABLE_NAME, tables_column.as_str()),
570            (tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
571        ]
572    } else {
573        vec![(tables::TABLE_NAME, tables_column.as_str())]
574    };
575    let filters = vec![
576        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
577        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
578    ];
579    let like_field = Some(tables::TABLE_NAME);
580    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
581
582    // Transform the WHERE clause for backward compatibility:
583    // Replace "Tables" with "Tables_in_{schema}" to support old queries
584    let kind = match stmt.kind {
585        ShowKind::Where(mut filter) => {
586            replace_column_in_expr(&mut filter, "Tables", &tables_column);
587            ShowKind::Where(filter)
588        }
589        other => other,
590    };
591
592    query_from_information_schema_table(
593        query_engine,
594        catalog_manager,
595        query_ctx,
596        TABLES,
597        vec![],
598        projects,
599        filters,
600        like_field,
601        sort,
602        kind,
603    )
604    .await
605}
606
607/// Execute [`ShowTableStatus`] statement and return the [`Output`] if success.
608pub async fn show_table_status(
609    stmt: ShowTableStatus,
610    query_engine: &QueryEngineRef,
611    catalog_manager: &CatalogManagerRef,
612    query_ctx: QueryContextRef,
613) -> Result<Output> {
614    let schema_name = if let Some(database) = stmt.database {
615        database
616    } else {
617        query_ctx.current_schema()
618    };
619
620    // Refer to https://dev.mysql.com/doc/refman/8.4/en/show-table-status.html
621    let projects = vec![
622        (tables::TABLE_NAME, "Name"),
623        (tables::ENGINE, "Engine"),
624        (tables::VERSION, "Version"),
625        (tables::ROW_FORMAT, "Row_format"),
626        (tables::TABLE_ROWS, "Rows"),
627        (tables::AVG_ROW_LENGTH, "Avg_row_length"),
628        (tables::DATA_LENGTH, "Data_length"),
629        (tables::MAX_DATA_LENGTH, "Max_data_length"),
630        (tables::INDEX_LENGTH, "Index_length"),
631        (tables::DATA_FREE, "Data_free"),
632        (tables::AUTO_INCREMENT, "Auto_increment"),
633        (tables::CREATE_TIME, "Create_time"),
634        (tables::UPDATE_TIME, "Update_time"),
635        (tables::CHECK_TIME, "Check_time"),
636        (tables::TABLE_COLLATION, "Collation"),
637        (tables::CHECKSUM, "Checksum"),
638        (tables::CREATE_OPTIONS, "Create_options"),
639        (tables::TABLE_COMMENT, "Comment"),
640    ];
641
642    let filters = vec![
643        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
644        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
645    ];
646    let like_field = Some(tables::TABLE_NAME);
647    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
648
649    query_from_information_schema_table(
650        query_engine,
651        catalog_manager,
652        query_ctx,
653        TABLES,
654        vec![],
655        projects,
656        filters,
657        like_field,
658        sort,
659        stmt.kind,
660    )
661    .await
662}
663
664/// Execute `SHOW COLLATION` statement and returns the `Output` if success.
665pub async fn show_collations(
666    kind: ShowKind,
667    query_engine: &QueryEngineRef,
668    catalog_manager: &CatalogManagerRef,
669    query_ctx: QueryContextRef,
670) -> Result<Output> {
671    // Refer to https://dev.mysql.com/doc/refman/8.0/en/show-collation.html
672    let projects = vec![
673        ("collation_name", "Collation"),
674        ("character_set_name", "Charset"),
675        ("id", "Id"),
676        ("is_default", "Default"),
677        ("is_compiled", "Compiled"),
678        ("sortlen", "Sortlen"),
679    ];
680
681    let filters = vec![];
682    let like_field = Some("collation_name");
683    let sort = vec![];
684
685    query_from_information_schema_table(
686        query_engine,
687        catalog_manager,
688        query_ctx,
689        COLLATIONS,
690        vec![],
691        projects,
692        filters,
693        like_field,
694        sort,
695        kind,
696    )
697    .await
698}
699
700/// Execute `SHOW CHARSET` statement and returns the `Output` if success.
701pub async fn show_charsets(
702    kind: ShowKind,
703    query_engine: &QueryEngineRef,
704    catalog_manager: &CatalogManagerRef,
705    query_ctx: QueryContextRef,
706) -> Result<Output> {
707    // Refer to https://dev.mysql.com/doc/refman/8.0/en/show-character-set.html
708    let projects = vec![
709        ("character_set_name", "Charset"),
710        ("description", "Description"),
711        ("default_collate_name", "Default collation"),
712        ("maxlen", "Maxlen"),
713    ];
714
715    let filters = vec![];
716    let like_field = Some("character_set_name");
717    let sort = vec![];
718
719    query_from_information_schema_table(
720        query_engine,
721        catalog_manager,
722        query_ctx,
723        CHARACTER_SETS,
724        vec![],
725        projects,
726        filters,
727        like_field,
728        sort,
729        kind,
730    )
731    .await
732}
733
734pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
735    let variable = stmt.variable.to_string().to_uppercase();
736    let value = match variable.as_str() {
737        "SYSTEM_TIME_ZONE" | "SYSTEM_TIMEZONE" => get_timezone(None).to_string(),
738        "TIME_ZONE" | "TIMEZONE" => query_ctx.timezone().to_string(),
739        "READ_PREFERENCE" => query_ctx.read_preference().to_string(),
740        "DATESTYLE" => {
741            let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
742            format!("{}, {}", style, order)
743        }
744        "MAX_EXECUTION_TIME" => {
745            if query_ctx.channel() == Channel::Mysql {
746                query_ctx.query_timeout_as_millis().to_string()
747            } else {
748                return UnsupportedVariableSnafu { name: variable }.fail();
749            }
750        }
751        "STATEMENT_TIMEOUT" => {
752            // Add time units to postgres query timeout display.
753            if query_ctx.channel() == Channel::Postgres {
754                let mut timeout = query_ctx.query_timeout_as_millis().to_string();
755                timeout.push_str("ms");
756                timeout
757            } else {
758                return UnsupportedVariableSnafu { name: variable }.fail();
759            }
760        }
761        _ => return UnsupportedVariableSnafu { name: variable }.fail(),
762    };
763    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
764        variable,
765        ConcreteDataType::string_datatype(),
766        false,
767    )]));
768    let records = RecordBatches::try_from_columns(
769        schema,
770        vec![Arc::new(StringVector::from(vec![value])) as _],
771    )
772    .context(error::CreateRecordBatchSnafu)?;
773    Ok(Output::new_with_record_batches(records))
774}
775
776pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
777    let schema = Arc::new(Schema::new(vec![
778        ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), false),
779        ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
780    ]));
781    let records = RecordBatches::try_from_columns(
782        schema,
783        vec![
784            Arc::new(StringVector::from(Vec::<&str>::new())) as _,
785            Arc::new(StringVector::from(Vec::<&str>::new())) as _,
786        ],
787    )
788    .context(error::CreateRecordBatchSnafu)?;
789    Ok(Output::new_with_record_batches(records))
790}
791
792pub async fn show_search_path(_query_ctx: QueryContextRef) -> Result<Output> {
793    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
794        "search_path",
795        ConcreteDataType::string_datatype(),
796        false,
797    )]));
798    let records = RecordBatches::try_from_columns(
799        schema,
800        vec![Arc::new(StringVector::from(vec![_query_ctx.current_schema()])) as _],
801    )
802    .context(error::CreateRecordBatchSnafu)?;
803    Ok(Output::new_with_record_batches(records))
804}
805
806pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
807    let stmt = CreateDatabase {
808        name: ObjectName::from(vec![Ident::new(database_name)]),
809        if_not_exists: true,
810        options,
811    };
812    let sql = format!("{stmt}");
813    let columns = vec![
814        Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
815        Arc::new(StringVector::from(vec![sql])) as _,
816    ];
817    let records =
818        RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
819            .context(error::CreateRecordBatchSnafu)?;
820    Ok(Output::new_with_record_batches(records))
821}
822
823pub fn show_create_table(
824    table: TableRef,
825    schema_options: Option<SchemaOptions>,
826    partitions: Option<Partitions>,
827    query_ctx: QueryContextRef,
828) -> Result<Output> {
829    let table_info = table.table_info();
830    let table_name = &table_info.name;
831
832    let quote_style = query_ctx.quote_style();
833
834    let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
835    stmt.partitions = partitions.map(|mut p| {
836        p.set_quote(quote_style);
837        p
838    });
839    let sql = format!("{}", stmt);
840    let columns = vec![
841        Arc::new(StringVector::from(vec![table_name.clone()])) as _,
842        Arc::new(StringVector::from(vec![sql])) as _,
843    ];
844    let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
845        .context(error::CreateRecordBatchSnafu)?;
846
847    Ok(Output::new_with_record_batches(records))
848}
849
850pub fn show_create_foreign_table_for_pg(
851    table: TableRef,
852    _query_ctx: QueryContextRef,
853) -> Result<Output> {
854    let table_info = table.table_info();
855
856    let table_meta = &table_info.meta;
857    let table_name = &table_info.name;
858    let schema = &table_info.meta.schema;
859    let is_metric_engine = is_metric_engine(&table_meta.engine);
860
861    let columns = schema
862        .column_schemas()
863        .iter()
864        .filter_map(|c| {
865            if is_metric_engine && is_metric_engine_internal_column(&c.name) {
866                None
867            } else {
868                Some(format!(
869                    "\"{}\" {}",
870                    c.name,
871                    c.data_type.postgres_datatype_name()
872                ))
873            }
874        })
875        .join(",\n  ");
876
877    let sql = format!(
878        r#"CREATE FOREIGN TABLE ft_{} (
879  {}
880)
881SERVER greptimedb
882OPTIONS (table_name '{}')"#,
883        table_name, columns, table_name
884    );
885
886    let columns = vec![
887        Arc::new(StringVector::from(vec![table_name.clone()])) as _,
888        Arc::new(StringVector::from(vec![sql])) as _,
889    ];
890    let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
891        .context(error::CreateRecordBatchSnafu)?;
892
893    Ok(Output::new_with_record_batches(records))
894}
895
896pub fn show_create_view(
897    view_name: ObjectName,
898    definition: &str,
899    query_ctx: QueryContextRef,
900) -> Result<Output> {
901    let mut parser_ctx =
902        ParserContext::new(query_ctx.sql_dialect(), definition).context(error::SqlSnafu)?;
903
904    let Statement::CreateView(create_view) =
905        parser_ctx.parse_statement().context(error::SqlSnafu)?
906    else {
907        // MUST be `CreateView` statement.
908        unreachable!();
909    };
910
911    let stmt = CreateView {
912        name: view_name.clone(),
913        columns: create_view.columns,
914        query: create_view.query,
915        or_replace: create_view.or_replace,
916        if_not_exists: create_view.if_not_exists,
917    };
918
919    let sql = format!("{}", stmt);
920    let columns = vec![
921        Arc::new(StringVector::from(vec![view_name.to_string()])) as _,
922        Arc::new(StringVector::from(vec![sql])) as _,
923    ];
924    let records = RecordBatches::try_from_columns(SHOW_CREATE_VIEW_OUTPUT_SCHEMA.clone(), columns)
925        .context(error::CreateRecordBatchSnafu)?;
926
927    Ok(Output::new_with_record_batches(records))
928}
929
930/// Execute [`ShowViews`] statement and return the [`Output`] if success.
931pub async fn show_views(
932    stmt: ShowViews,
933    query_engine: &QueryEngineRef,
934    catalog_manager: &CatalogManagerRef,
935    query_ctx: QueryContextRef,
936) -> Result<Output> {
937    let schema_name = if let Some(database) = stmt.database {
938        database
939    } else {
940        query_ctx.current_schema()
941    };
942
943    let projects = vec![(tables::TABLE_NAME, VIEWS_COLUMN)];
944    let filters = vec![
945        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
946        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
947    ];
948    let like_field = Some(tables::TABLE_NAME);
949    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
950
951    query_from_information_schema_table(
952        query_engine,
953        catalog_manager,
954        query_ctx,
955        VIEWS,
956        vec![],
957        projects,
958        filters,
959        like_field,
960        sort,
961        stmt.kind,
962    )
963    .await
964}
965
966/// Execute [`ShowFlows`] statement and return the [`Output`] if success.
967pub async fn show_flows(
968    stmt: ShowFlows,
969    query_engine: &QueryEngineRef,
970    catalog_manager: &CatalogManagerRef,
971    query_ctx: QueryContextRef,
972) -> Result<Output> {
973    let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
974    let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
975    let like_field = Some(flows::FLOW_NAME);
976    let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
977
978    query_from_information_schema_table(
979        query_engine,
980        catalog_manager,
981        query_ctx,
982        FLOWS,
983        vec![],
984        projects,
985        filters,
986        like_field,
987        sort,
988        stmt.kind,
989    )
990    .await
991}
992
993#[cfg(feature = "enterprise")]
994pub async fn show_triggers(
995    stmt: sql::statements::show::trigger::ShowTriggers,
996    query_engine: &QueryEngineRef,
997    catalog_manager: &CatalogManagerRef,
998    query_ctx: QueryContextRef,
999) -> Result<Output> {
1000    const TRIGGER_NAME: &str = "trigger_name";
1001    const TRIGGERS_COLUMN: &str = "Triggers";
1002
1003    let projects = vec![(TRIGGER_NAME, TRIGGERS_COLUMN)];
1004    let like_field = Some(TRIGGER_NAME);
1005    let sort = vec![col(TRIGGER_NAME).sort(true, true)];
1006
1007    query_from_information_schema_table(
1008        query_engine,
1009        catalog_manager,
1010        query_ctx,
1011        catalog::information_schema::TRIGGERS,
1012        vec![],
1013        projects,
1014        vec![],
1015        like_field,
1016        sort,
1017        stmt.kind,
1018    )
1019    .await
1020}
1021
1022pub fn show_create_flow(
1023    flow_name: ObjectName,
1024    flow_val: FlowInfoValue,
1025    query_ctx: QueryContextRef,
1026) -> Result<Output> {
1027    let mut parser_ctx =
1028        ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
1029
1030    let query = parser_ctx.parse_statement().context(error::SqlSnafu)?;
1031
1032    // since prom ql will parse `now()` to a fixed time, we need to not use it for generating raw query
1033    let raw_query = match &query {
1034        Statement::Tql(_) => flow_val.raw_sql().clone(),
1035        _ => query.to_string(),
1036    };
1037
1038    let query = Box::new(SqlOrTql::try_from_statement(query, &raw_query).context(error::SqlSnafu)?);
1039
1040    let comment = if flow_val.comment().is_empty() {
1041        None
1042    } else {
1043        Some(flow_val.comment().clone())
1044    };
1045
1046    let stmt = CreateFlow {
1047        flow_name,
1048        sink_table_name: ObjectName::from(vec![Ident::new(&flow_val.sink_table_name().table_name)]),
1049        // notice we don't want `OR REPLACE` and `IF NOT EXISTS` in same sql since it's unclear what to do
1050        // so we set `or_replace` to false.
1051        or_replace: false,
1052        if_not_exists: true,
1053        expire_after: flow_val.expire_after(),
1054        eval_interval: flow_val.eval_interval(),
1055        comment,
1056        query,
1057    };
1058
1059    let sql = format!("{}", stmt);
1060    let columns = vec![
1061        Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
1062        Arc::new(StringVector::from(vec![sql])) as _,
1063    ];
1064    let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
1065        .context(error::CreateRecordBatchSnafu)?;
1066
1067    Ok(Output::new_with_record_batches(records))
1068}
1069
1070pub fn describe_table(table: TableRef) -> Result<Output> {
1071    let table_info = table.table_info();
1072    let columns_schemas = table_info.meta.schema.column_schemas();
1073    let columns = vec![
1074        describe_column_names(columns_schemas),
1075        describe_column_types(columns_schemas),
1076        describe_column_keys(columns_schemas, &table_info.meta.primary_key_indices),
1077        describe_column_nullables(columns_schemas),
1078        describe_column_defaults(columns_schemas),
1079        describe_column_semantic_types(columns_schemas, &table_info.meta.primary_key_indices),
1080    ];
1081    let records = RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), columns)
1082        .context(error::CreateRecordBatchSnafu)?;
1083    Ok(Output::new_with_record_batches(records))
1084}
1085
1086fn describe_column_names(columns_schemas: &[ColumnSchema]) -> VectorRef {
1087    Arc::new(StringVector::from_iterator(
1088        columns_schemas.iter().map(|cs| cs.name.as_str()),
1089    ))
1090}
1091
1092fn describe_column_types(columns_schemas: &[ColumnSchema]) -> VectorRef {
1093    Arc::new(StringVector::from(
1094        columns_schemas
1095            .iter()
1096            .map(|cs| cs.data_type.name())
1097            .collect::<Vec<_>>(),
1098    ))
1099}
1100
1101fn describe_column_keys(
1102    columns_schemas: &[ColumnSchema],
1103    primary_key_indices: &[usize],
1104) -> VectorRef {
1105    Arc::new(StringVector::from_iterator(
1106        columns_schemas.iter().enumerate().map(|(i, cs)| {
1107            if cs.is_time_index() || primary_key_indices.contains(&i) {
1108                PRI_KEY
1109            } else {
1110                ""
1111            }
1112        }),
1113    ))
1114}
1115
1116fn describe_column_nullables(columns_schemas: &[ColumnSchema]) -> VectorRef {
1117    Arc::new(StringVector::from_iterator(columns_schemas.iter().map(
1118        |cs| {
1119            if cs.is_nullable() { YES_STR } else { NO_STR }
1120        },
1121    )))
1122}
1123
1124fn describe_column_defaults(columns_schemas: &[ColumnSchema]) -> VectorRef {
1125    Arc::new(StringVector::from(
1126        columns_schemas
1127            .iter()
1128            .map(|cs| {
1129                cs.default_constraint()
1130                    .map_or(String::from(""), |dc| dc.to_string())
1131            })
1132            .collect::<Vec<String>>(),
1133    ))
1134}
1135
1136fn describe_column_semantic_types(
1137    columns_schemas: &[ColumnSchema],
1138    primary_key_indices: &[usize],
1139) -> VectorRef {
1140    Arc::new(StringVector::from_iterator(
1141        columns_schemas.iter().enumerate().map(|(i, cs)| {
1142            if primary_key_indices.contains(&i) {
1143                SEMANTIC_TYPE_PRIMARY_KEY
1144            } else if cs.is_time_index() {
1145                SEMANTIC_TYPE_TIME_INDEX
1146            } else {
1147                SEMANTIC_TYPE_FIELD
1148            }
1149        }),
1150    ))
1151}
1152
1153// lists files in the frontend to reduce unnecessary scan requests repeated in each datanode.
1154pub async fn prepare_file_table_files(
1155    options: &HashMap<String, String>,
1156) -> Result<(ObjectStore, Vec<String>)> {
1157    let url = options
1158        .get(FILE_TABLE_LOCATION_KEY)
1159        .context(error::MissingRequiredFieldSnafu {
1160            name: FILE_TABLE_LOCATION_KEY,
1161        })?;
1162
1163    let (dir, filename) = find_dir_and_filename(url);
1164    let source = if let Some(filename) = filename {
1165        Source::Filename(filename)
1166    } else {
1167        Source::Dir
1168    };
1169    let regex = options
1170        .get(FILE_TABLE_PATTERN_KEY)
1171        .map(|x| Regex::new(x))
1172        .transpose()
1173        .context(error::BuildRegexSnafu)?;
1174    let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
1175    let lister = Lister::new(object_store.clone(), source, dir, regex);
1176    // If we scan files in a directory every time the database restarts,
1177    // then it might lead to a potential undefined behavior:
1178    // If a user adds a file with an incompatible schema to that directory,
1179    // it will make the external table unavailable.
1180    let files = lister
1181        .list()
1182        .await
1183        .context(error::ListObjectsSnafu)?
1184        .into_iter()
1185        .filter_map(|entry| {
1186            if entry.path().ends_with('/') {
1187                None
1188            } else {
1189                Some(entry.path().to_string())
1190            }
1191        })
1192        .collect::<Vec<_>>();
1193    Ok((object_store, files))
1194}
1195
1196pub async fn infer_file_table_schema(
1197    object_store: &ObjectStore,
1198    files: &[String],
1199    options: &HashMap<String, String>,
1200) -> Result<RawSchema> {
1201    let format = parse_file_table_format(options)?;
1202    let merged = infer_schemas(object_store, files, format.as_ref())
1203        .await
1204        .context(error::InferSchemaSnafu)?;
1205    Ok(RawSchema::from(
1206        &Schema::try_from(merged).context(error::ConvertSchemaSnafu)?,
1207    ))
1208}
1209
1210// Converts the file column schemas to table column schemas.
1211// Returns the column schemas and the time index column name.
1212//
1213// More specifically, this function will do the following:
1214// 1. Add a default time index column if there is no time index column
1215//    in the file column schemas, or
1216// 2. If the file column schemas contain a column with name conflicts with
1217//    the default time index column, it will replace the column schema
1218//    with the default one.
1219pub fn file_column_schemas_to_table(
1220    file_column_schemas: &[ColumnSchema],
1221) -> (Vec<ColumnSchema>, String) {
1222    let mut column_schemas = file_column_schemas.to_owned();
1223    if let Some(time_index_column) = column_schemas.iter().find(|c| c.is_time_index()) {
1224        let time_index = time_index_column.name.clone();
1225        return (column_schemas, time_index);
1226    }
1227
1228    let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
1229    let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
1230    let timestamp_column_schema = ColumnSchema::new(greptime_timestamp(), timestamp_type, false)
1231        .with_time_index(true)
1232        .with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
1233        .unwrap();
1234
1235    if let Some(column_schema) = column_schemas
1236        .iter_mut()
1237        .find(|column_schema| column_schema.name == greptime_timestamp())
1238    {
1239        // Replace the column schema with the default one
1240        *column_schema = timestamp_column_schema;
1241    } else {
1242        column_schemas.push(timestamp_column_schema);
1243    }
1244
1245    (column_schemas, greptime_timestamp().to_string())
1246}
1247
1248/// This function checks if the column schemas from a file can be matched with
1249/// the column schemas of a table.
1250///
1251/// More specifically, for each column seen in the table schema,
1252/// - If the same column does exist in the file schema, it checks if the data
1253///   type of the file column can be casted into the form of the table column.
1254/// - If the same column does not exist in the file schema, it checks if the
1255///   table column is nullable or has a default constraint.
1256pub fn check_file_to_table_schema_compatibility(
1257    file_column_schemas: &[ColumnSchema],
1258    table_column_schemas: &[ColumnSchema],
1259) -> Result<()> {
1260    let file_schemas_map = file_column_schemas
1261        .iter()
1262        .map(|s| (s.name.clone(), s))
1263        .collect::<HashMap<_, _>>();
1264
1265    for table_column in table_column_schemas {
1266        if let Some(file_column) = file_schemas_map.get(&table_column.name) {
1267            // TODO(zhongzc): a temporary solution, we should use `can_cast_to` once it's ready.
1268            ensure!(
1269                file_column
1270                    .data_type
1271                    .can_arrow_type_cast_to(&table_column.data_type),
1272                error::ColumnSchemaIncompatibleSnafu {
1273                    column: table_column.name.clone(),
1274                    file_type: file_column.data_type.clone(),
1275                    table_type: table_column.data_type.clone(),
1276                }
1277            );
1278        } else {
1279            ensure!(
1280                table_column.is_nullable() || table_column.default_constraint().is_some(),
1281                error::ColumnSchemaNoDefaultSnafu {
1282                    column: table_column.name.clone(),
1283                }
1284            );
1285        }
1286    }
1287
1288    Ok(())
1289}
1290
1291fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn FileFormat>> {
1292    Ok(
1293        match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
1294            Format::Csv(format) => Box::new(format),
1295            Format::Json(format) => Box::new(format),
1296            Format::Parquet(format) => Box::new(format),
1297            Format::Orc(format) => Box::new(format),
1298        },
1299    )
1300}
1301
1302pub async fn show_processlist(
1303    stmt: ShowProcessList,
1304    query_engine: &QueryEngineRef,
1305    catalog_manager: &CatalogManagerRef,
1306    query_ctx: QueryContextRef,
1307) -> Result<Output> {
1308    let projects = if stmt.full {
1309        vec![
1310            (process_list::ID, "Id"),
1311            (process_list::CATALOG, "Catalog"),
1312            (process_list::SCHEMAS, "Schema"),
1313            (process_list::CLIENT, "Client"),
1314            (process_list::FRONTEND, "Frontend"),
1315            (process_list::START_TIMESTAMP, "Start Time"),
1316            (process_list::ELAPSED_TIME, "Elapsed Time"),
1317            (process_list::QUERY, "Query"),
1318        ]
1319    } else {
1320        vec![
1321            (process_list::ID, "Id"),
1322            (process_list::CATALOG, "Catalog"),
1323            (process_list::QUERY, "Query"),
1324            (process_list::ELAPSED_TIME, "Elapsed Time"),
1325        ]
1326    };
1327
1328    let filters = vec![];
1329    let like_field = None;
1330    let sort = vec![col("id").sort(true, true)];
1331    query_from_information_schema_table(
1332        query_engine,
1333        catalog_manager,
1334        query_ctx.clone(),
1335        "process_list",
1336        vec![],
1337        projects.clone(),
1338        filters,
1339        like_field,
1340        sort,
1341        ShowKind::All,
1342    )
1343    .await
1344}
1345
1346#[cfg(test)]
1347mod test {
1348    use std::sync::Arc;
1349
1350    use common_query::{Output, OutputData};
1351    use common_recordbatch::{RecordBatch, RecordBatches};
1352    use common_time::Timezone;
1353    use common_time::timestamp::TimeUnit;
1354    use datatypes::prelude::ConcreteDataType;
1355    use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
1356    use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
1357    use session::context::QueryContextBuilder;
1358    use snafu::ResultExt;
1359    use sql::ast::{Ident, ObjectName};
1360    use sql::statements::show::ShowVariables;
1361    use table::TableRef;
1362    use table::test_util::MemTable;
1363
1364    use super::show_variable;
1365    use crate::error;
1366    use crate::error::Result;
1367    use crate::sql::{
1368        DESCRIBE_TABLE_OUTPUT_SCHEMA, NO_STR, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_TIME_INDEX,
1369        YES_STR, describe_table,
1370    };
1371
1372    #[test]
1373    fn test_describe_table_multiple_columns() -> Result<()> {
1374        let table_name = "test_table";
1375        let schema = vec![
1376            ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
1377            ColumnSchema::new(
1378                "t2",
1379                ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
1380                false,
1381            )
1382            .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
1383                "current_timestamp()",
1384            ))))
1385            .unwrap()
1386            .with_time_index(true),
1387        ];
1388        let data = vec![
1389            Arc::new(UInt32Vector::from_slice([0])) as _,
1390            Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
1391        ];
1392        let expected_columns = vec![
1393            Arc::new(StringVector::from(vec!["t1", "t2"])) as _,
1394            Arc::new(StringVector::from(vec!["UInt32", "TimestampMillisecond"])) as _,
1395            Arc::new(StringVector::from(vec!["", "PRI"])) as _,
1396            Arc::new(StringVector::from(vec![YES_STR, NO_STR])) as _,
1397            Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
1398            Arc::new(StringVector::from(vec![
1399                SEMANTIC_TYPE_FIELD,
1400                SEMANTIC_TYPE_TIME_INDEX,
1401            ])) as _,
1402        ];
1403
1404        describe_table_test_by_schema(table_name, schema, data, expected_columns)
1405    }
1406
1407    fn describe_table_test_by_schema(
1408        table_name: &str,
1409        schema: Vec<ColumnSchema>,
1410        data: Vec<VectorRef>,
1411        expected_columns: Vec<VectorRef>,
1412    ) -> Result<()> {
1413        let table_schema = SchemaRef::new(Schema::new(schema));
1414        let table = prepare_describe_table(table_name, table_schema, data);
1415
1416        let expected =
1417            RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
1418                .context(error::CreateRecordBatchSnafu)?;
1419
1420        if let OutputData::RecordBatches(res) = describe_table(table)?.data {
1421            assert_eq!(res.take(), expected.take());
1422        } else {
1423            panic!("describe table must return record batch");
1424        }
1425
1426        Ok(())
1427    }
1428
1429    fn prepare_describe_table(
1430        table_name: &str,
1431        table_schema: SchemaRef,
1432        data: Vec<VectorRef>,
1433    ) -> TableRef {
1434        let record_batch = RecordBatch::new(table_schema, data).unwrap();
1435        MemTable::table(table_name, record_batch)
1436    }
1437
1438    #[test]
1439    fn test_show_variable() {
1440        assert_eq!(
1441            exec_show_variable("SYSTEM_TIME_ZONE", "Asia/Shanghai").unwrap(),
1442            "UTC"
1443        );
1444        assert_eq!(
1445            exec_show_variable("SYSTEM_TIMEZONE", "Asia/Shanghai").unwrap(),
1446            "UTC"
1447        );
1448        assert_eq!(
1449            exec_show_variable("TIME_ZONE", "Asia/Shanghai").unwrap(),
1450            "Asia/Shanghai"
1451        );
1452        assert_eq!(
1453            exec_show_variable("TIMEZONE", "Asia/Shanghai").unwrap(),
1454            "Asia/Shanghai"
1455        );
1456        assert!(exec_show_variable("TIME ZONE", "Asia/Shanghai").is_err());
1457        assert!(exec_show_variable("SYSTEM TIME ZONE", "Asia/Shanghai").is_err());
1458    }
1459
1460    fn exec_show_variable(variable: &str, tz: &str) -> Result<String> {
1461        let stmt = ShowVariables {
1462            variable: ObjectName::from(vec![Ident::new(variable)]),
1463        };
1464        let ctx = Arc::new(
1465            QueryContextBuilder::default()
1466                .timezone(Timezone::from_tz_string(tz).unwrap())
1467                .build(),
1468        );
1469        match show_variable(stmt, ctx) {
1470            Ok(Output {
1471                data: OutputData::RecordBatches(record),
1472                ..
1473            }) => {
1474                let record = record.take().first().cloned().unwrap();
1475                Ok(record.iter_column_as_string(0).next().unwrap().unwrap())
1476            }
1477            Ok(_) => unreachable!(),
1478            Err(e) => Err(e),
1479        }
1480    }
1481}