query/
sql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod show_create_table;
16
17use std::collections::HashMap;
18use std::ops::ControlFlow;
19use std::sync::Arc;
20
21use catalog::CatalogManagerRef;
22use catalog::information_schema::{
23    CHARACTER_SETS, COLLATIONS, COLUMNS, FLOWS, KEY_COLUMN_USAGE, REGION_PEERS, SCHEMATA, TABLES,
24    VIEWS, columns, flows, key_column_usage, process_list, region_peers, schemata, tables,
25};
26use common_catalog::consts::{
27    INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
28    SEMANTIC_TYPE_TIME_INDEX,
29};
30use common_catalog::format_full_table_name;
31use common_datasource::file_format::{FileFormat, Format, infer_schemas};
32use common_datasource::lister::{Lister, Source};
33use common_datasource::object_store::build_backend;
34use common_datasource::util::find_dir_and_filename;
35use common_meta::SchemaOptions;
36use common_meta::key::flow::flow_info::FlowInfoValue;
37use common_query::Output;
38use common_query::prelude::greptime_timestamp;
39use common_recordbatch::RecordBatches;
40use common_recordbatch::adapter::RecordBatchStreamAdapter;
41use common_time::Timestamp;
42use common_time::timezone::get_timezone;
43use datafusion::common::ScalarValue;
44use datafusion::prelude::SessionContext;
45use datafusion_expr::{Expr, SortExpr, case, col, lit};
46use datatypes::prelude::*;
47use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
48use datatypes::vectors::StringVector;
49use itertools::Itertools;
50use object_store::ObjectStore;
51use once_cell::sync::Lazy;
52use regex::Regex;
53use session::context::{Channel, QueryContextRef};
54pub use show_create_table::create_table_stmt;
55use snafu::{OptionExt, ResultExt, ensure};
56use sql::ast::{Ident, visit_expressions_mut};
57use sql::parser::ParserContext;
58use sql::statements::OptionMap;
59use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions, SqlOrTql};
60use sql::statements::show::{
61    ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowProcessList, ShowRegion,
62    ShowTableStatus, ShowTables, ShowVariables, ShowViews,
63};
64use sql::statements::statement::Statement;
65use sqlparser::ast::ObjectName;
66use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
67use table::TableRef;
68use table::metadata::TableInfoRef;
69use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
70
71use crate::QueryEngineRef;
72use crate::error::{self, Result, UnsupportedVariableSnafu};
73use crate::planner::DfLogicalPlanner;
74
75const SCHEMAS_COLUMN: &str = "Database";
76const OPTIONS_COLUMN: &str = "Options";
77const VIEWS_COLUMN: &str = "Views";
78const FLOWS_COLUMN: &str = "Flows";
79const FIELD_COLUMN: &str = "Field";
80const TABLE_TYPE_COLUMN: &str = "Table_type";
81const COLUMN_NAME_COLUMN: &str = "Column";
82const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
83const COLUMN_TYPE_COLUMN: &str = "Type";
84const COLUMN_KEY_COLUMN: &str = "Key";
85const COLUMN_EXTRA_COLUMN: &str = "Extra";
86const COLUMN_PRIVILEGES_COLUMN: &str = "Privileges";
87const COLUMN_COLLATION_COLUMN: &str = "Collation";
88const COLUMN_NULLABLE_COLUMN: &str = "Null";
89const COLUMN_DEFAULT_COLUMN: &str = "Default";
90const COLUMN_COMMENT_COLUMN: &str = "Comment";
91const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
92
93const YES_STR: &str = "YES";
94const NO_STR: &str = "NO";
95const PRI_KEY: &str = "PRI";
96const TIME_INDEX: &str = "TIME INDEX";
97
98/// SHOW index columns
99const INDEX_TABLE_COLUMN: &str = "Table";
100const INDEX_NONT_UNIQUE_COLUMN: &str = "Non_unique";
101const INDEX_CARDINALITY_COLUMN: &str = "Cardinality";
102const INDEX_SUB_PART_COLUMN: &str = "Sub_part";
103const INDEX_PACKED_COLUMN: &str = "Packed";
104const INDEX_INDEX_TYPE_COLUMN: &str = "Index_type";
105const INDEX_COMMENT_COLUMN: &str = "Index_comment";
106const INDEX_VISIBLE_COLUMN: &str = "Visible";
107const INDEX_EXPRESSION_COLUMN: &str = "Expression";
108const INDEX_KEY_NAME_COLUMN: &str = "Key_name";
109const INDEX_SEQ_IN_INDEX_COLUMN: &str = "Seq_in_index";
110const INDEX_COLUMN_NAME_COLUMN: &str = "Column_name";
111
112static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
113    Arc::new(Schema::new(vec![
114        ColumnSchema::new(
115            COLUMN_NAME_COLUMN,
116            ConcreteDataType::string_datatype(),
117            false,
118        ),
119        ColumnSchema::new(
120            COLUMN_TYPE_COLUMN,
121            ConcreteDataType::string_datatype(),
122            false,
123        ),
124        ColumnSchema::new(COLUMN_KEY_COLUMN, ConcreteDataType::string_datatype(), true),
125        ColumnSchema::new(
126            COLUMN_NULLABLE_COLUMN,
127            ConcreteDataType::string_datatype(),
128            false,
129        ),
130        ColumnSchema::new(
131            COLUMN_DEFAULT_COLUMN,
132            ConcreteDataType::string_datatype(),
133            false,
134        ),
135        ColumnSchema::new(
136            COLUMN_SEMANTIC_TYPE_COLUMN,
137            ConcreteDataType::string_datatype(),
138            false,
139        ),
140    ]))
141});
142
143static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
144    Arc::new(Schema::new(vec![
145        ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
146        ColumnSchema::new(
147            "Create Database",
148            ConcreteDataType::string_datatype(),
149            false,
150        ),
151    ]))
152});
153
154static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
155    Arc::new(Schema::new(vec![
156        ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
157        ColumnSchema::new("Create Table", ConcreteDataType::string_datatype(), false),
158    ]))
159});
160
161static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
162    Arc::new(Schema::new(vec![
163        ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
164        ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
165    ]))
166});
167
168static SHOW_CREATE_VIEW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
169    Arc::new(Schema::new(vec![
170        ColumnSchema::new("View", ConcreteDataType::string_datatype(), false),
171        ColumnSchema::new("Create View", ConcreteDataType::string_datatype(), false),
172    ]))
173});
174
175fn null() -> Expr {
176    lit(ScalarValue::Null)
177}
178
179pub async fn show_databases(
180    stmt: ShowDatabases,
181    query_engine: &QueryEngineRef,
182    catalog_manager: &CatalogManagerRef,
183    query_ctx: QueryContextRef,
184) -> Result<Output> {
185    let projects = if stmt.full {
186        vec![
187            (schemata::SCHEMA_NAME, SCHEMAS_COLUMN),
188            (schemata::SCHEMA_OPTS, OPTIONS_COLUMN),
189        ]
190    } else {
191        vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)]
192    };
193
194    let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
195    let like_field = Some(schemata::SCHEMA_NAME);
196    let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
197
198    query_from_information_schema_table(
199        query_engine,
200        catalog_manager,
201        query_ctx,
202        SCHEMATA,
203        vec![],
204        projects,
205        filters,
206        like_field,
207        sort,
208        stmt.kind,
209    )
210    .await
211}
212
213/// Replaces column identifier references in a SQL expression.
214/// Used for backward compatibility where old column names should work with new ones.
215fn replace_column_in_expr(expr: &mut sqlparser::ast::Expr, from_column: &str, to_column: &str) {
216    let _ = visit_expressions_mut(expr, |e| {
217        match e {
218            sqlparser::ast::Expr::Identifier(ident) => {
219                if ident.value.eq_ignore_ascii_case(from_column) {
220                    ident.value = to_column.to_string();
221                }
222            }
223            sqlparser::ast::Expr::CompoundIdentifier(idents) => {
224                if let Some(last) = idents.last_mut()
225                    && last.value.eq_ignore_ascii_case(from_column)
226                {
227                    last.value = to_column.to_string();
228                }
229            }
230            _ => {}
231        }
232        ControlFlow::<()>::Continue(())
233    });
234}
235
236/// Cast a `show` statement execution into a query from tables in  `information_schema`.
237/// - `table_name`: the table name in `information_schema`,
238/// - `projects`: query projection, a list of `(column, renamed_column)`,
239/// - `filters`: filter expressions for query,
240/// - `like_field`: the field to filter by the predicate `ShowKind::Like`,
241/// - `sort`: sort the results by the specified sorting expressions,
242/// - `kind`: the show kind
243#[allow(clippy::too_many_arguments)]
244async fn query_from_information_schema_table(
245    query_engine: &QueryEngineRef,
246    catalog_manager: &CatalogManagerRef,
247    query_ctx: QueryContextRef,
248    table_name: &str,
249    select: Vec<Expr>,
250    projects: Vec<(&str, &str)>,
251    filters: Vec<Expr>,
252    like_field: Option<&str>,
253    sort: Vec<SortExpr>,
254    kind: ShowKind,
255) -> Result<Output> {
256    let table = catalog_manager
257        .table(
258            query_ctx.current_catalog(),
259            INFORMATION_SCHEMA_NAME,
260            table_name,
261            Some(&query_ctx),
262        )
263        .await
264        .context(error::CatalogSnafu)?
265        .with_context(|| error::TableNotFoundSnafu {
266            table: format_full_table_name(
267                query_ctx.current_catalog(),
268                INFORMATION_SCHEMA_NAME,
269                table_name,
270            ),
271        })?;
272
273    let dataframe = query_engine.read_table(table)?;
274
275    // Apply filters
276    let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
277        df.filter(expr).context(error::PlanSqlSnafu)
278    })?;
279
280    // Apply `like` predicate if exists
281    let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
282        dataframe
283            .filter(col(field).like(lit(ident.value.clone())))
284            .context(error::PlanSqlSnafu)?
285    } else {
286        dataframe
287    };
288
289    // Apply sorting
290    let dataframe = if sort.is_empty() {
291        dataframe
292    } else {
293        dataframe.sort(sort).context(error::PlanSqlSnafu)?
294    };
295
296    // Apply select
297    let dataframe = if select.is_empty() {
298        if projects.is_empty() {
299            dataframe
300        } else {
301            let projection = projects
302                .iter()
303                .map(|x| col(x.0).alias(x.1))
304                .collect::<Vec<_>>();
305            dataframe.select(projection).context(error::PlanSqlSnafu)?
306        }
307    } else {
308        dataframe.select(select).context(error::PlanSqlSnafu)?
309    };
310
311    // Apply projection
312    let dataframe = projects
313        .into_iter()
314        .try_fold(dataframe, |df, (column, renamed_column)| {
315            df.with_column_renamed(column, renamed_column)
316                .context(error::PlanSqlSnafu)
317        })?;
318
319    let dataframe = match kind {
320        ShowKind::All | ShowKind::Like(_) => {
321            // Like kind is processed above
322            dataframe
323        }
324        ShowKind::Where(filter) => {
325            // Cast the results into VIEW for `where` clause,
326            // which is evaluated against the column names displayed by the SHOW statement.
327            let view = dataframe.into_view();
328            let dataframe = SessionContext::new_with_state(
329                query_engine
330                    .engine_context(query_ctx.clone())
331                    .state()
332                    .clone(),
333            )
334            .read_table(view)?;
335
336            let planner = query_engine.planner();
337            let planner = planner
338                .as_any()
339                .downcast_ref::<DfLogicalPlanner>()
340                .expect("Must be the datafusion planner");
341
342            let filter = planner
343                .sql_to_expr(filter, dataframe.schema(), false, query_ctx)
344                .await?;
345
346            // Apply the `where` clause filters
347            dataframe.filter(filter).context(error::PlanSqlSnafu)?
348        }
349    };
350
351    let stream = dataframe.execute_stream().await?;
352
353    Ok(Output::new_with_stream(Box::pin(
354        RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
355    )))
356}
357
358/// Execute `SHOW COLUMNS` statement.
359pub async fn show_columns(
360    stmt: ShowColumns,
361    query_engine: &QueryEngineRef,
362    catalog_manager: &CatalogManagerRef,
363    query_ctx: QueryContextRef,
364) -> Result<Output> {
365    let schema_name = if let Some(database) = stmt.database {
366        database
367    } else {
368        query_ctx.current_schema()
369    };
370
371    let projects = if stmt.full {
372        vec![
373            (columns::COLUMN_NAME, FIELD_COLUMN),
374            (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
375            (columns::COLLATION_NAME, COLUMN_COLLATION_COLUMN),
376            (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
377            (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
378            (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
379            (columns::COLUMN_COMMENT, COLUMN_COMMENT_COLUMN),
380            (columns::PRIVILEGES, COLUMN_PRIVILEGES_COLUMN),
381            (columns::EXTRA, COLUMN_EXTRA_COLUMN),
382            (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
383        ]
384    } else {
385        vec![
386            (columns::COLUMN_NAME, FIELD_COLUMN),
387            (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
388            (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
389            (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
390            (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
391            (columns::EXTRA, COLUMN_EXTRA_COLUMN),
392            (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
393        ]
394    };
395
396    let filters = vec![
397        col(columns::TABLE_NAME).eq(lit(&stmt.table)),
398        col(columns::TABLE_SCHEMA).eq(lit(schema_name.clone())),
399        col(columns::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
400    ];
401    let like_field = Some(columns::COLUMN_NAME);
402    let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
403
404    query_from_information_schema_table(
405        query_engine,
406        catalog_manager,
407        query_ctx,
408        COLUMNS,
409        vec![],
410        projects,
411        filters,
412        like_field,
413        sort,
414        stmt.kind,
415    )
416    .await
417}
418
419/// Execute `SHOW INDEX` statement.
420pub async fn show_index(
421    stmt: ShowIndex,
422    query_engine: &QueryEngineRef,
423    catalog_manager: &CatalogManagerRef,
424    query_ctx: QueryContextRef,
425) -> Result<Output> {
426    let schema_name = if let Some(database) = stmt.database {
427        database
428    } else {
429        query_ctx.current_schema()
430    };
431
432    let select = vec![
433        col(key_column_usage::TABLE_NAME).alias(INDEX_TABLE_COLUMN),
434        // 1 as `Non_unique`: contain duplicates
435        lit(1).alias(INDEX_NONT_UNIQUE_COLUMN),
436        col(key_column_usage::CONSTRAINT_NAME).alias(INDEX_KEY_NAME_COLUMN),
437        col(key_column_usage::ORDINAL_POSITION).alias(INDEX_SEQ_IN_INDEX_COLUMN),
438        col(key_column_usage::COLUMN_NAME).alias(INDEX_COLUMN_NAME_COLUMN),
439        // How the column is sorted in the index: A (ascending).
440        lit("A").alias(COLUMN_COLLATION_COLUMN),
441        null().alias(INDEX_CARDINALITY_COLUMN),
442        null().alias(INDEX_SUB_PART_COLUMN),
443        null().alias(INDEX_PACKED_COLUMN),
444        // case `constraint_name`
445        //    when 'TIME INDEX' then 'NO'
446        //    else 'YES'
447        // end as `Null`
448        case(col(key_column_usage::CONSTRAINT_NAME))
449            .when(lit(TIME_INDEX), lit(NO_STR))
450            .otherwise(lit(YES_STR))
451            .context(error::PlanSqlSnafu)?
452            .alias(COLUMN_NULLABLE_COLUMN),
453        col(key_column_usage::GREPTIME_INDEX_TYPE).alias(INDEX_INDEX_TYPE_COLUMN),
454        lit("").alias(COLUMN_COMMENT_COLUMN),
455        lit("").alias(INDEX_COMMENT_COLUMN),
456        lit(YES_STR).alias(INDEX_VISIBLE_COLUMN),
457        null().alias(INDEX_EXPRESSION_COLUMN),
458    ];
459
460    let projects = vec![
461        (key_column_usage::TABLE_NAME, INDEX_TABLE_COLUMN),
462        (INDEX_NONT_UNIQUE_COLUMN, INDEX_NONT_UNIQUE_COLUMN),
463        (key_column_usage::CONSTRAINT_NAME, INDEX_KEY_NAME_COLUMN),
464        (
465            key_column_usage::ORDINAL_POSITION,
466            INDEX_SEQ_IN_INDEX_COLUMN,
467        ),
468        (key_column_usage::COLUMN_NAME, INDEX_COLUMN_NAME_COLUMN),
469        (COLUMN_COLLATION_COLUMN, COLUMN_COLLATION_COLUMN),
470        (INDEX_CARDINALITY_COLUMN, INDEX_CARDINALITY_COLUMN),
471        (INDEX_SUB_PART_COLUMN, INDEX_SUB_PART_COLUMN),
472        (INDEX_PACKED_COLUMN, INDEX_PACKED_COLUMN),
473        (COLUMN_NULLABLE_COLUMN, COLUMN_NULLABLE_COLUMN),
474        (
475            key_column_usage::GREPTIME_INDEX_TYPE,
476            INDEX_INDEX_TYPE_COLUMN,
477        ),
478        (COLUMN_COMMENT_COLUMN, COLUMN_COMMENT_COLUMN),
479        (INDEX_COMMENT_COLUMN, INDEX_COMMENT_COLUMN),
480        (INDEX_VISIBLE_COLUMN, INDEX_VISIBLE_COLUMN),
481        (INDEX_EXPRESSION_COLUMN, INDEX_EXPRESSION_COLUMN),
482    ];
483
484    let filters = vec![
485        col(key_column_usage::TABLE_NAME).eq(lit(&stmt.table)),
486        col(key_column_usage::TABLE_SCHEMA).eq(lit(schema_name.clone())),
487        col(key_column_usage::REAL_TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
488    ];
489    let like_field = None;
490    let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
491
492    query_from_information_schema_table(
493        query_engine,
494        catalog_manager,
495        query_ctx,
496        KEY_COLUMN_USAGE,
497        select,
498        projects,
499        filters,
500        like_field,
501        sort,
502        stmt.kind,
503    )
504    .await
505}
506
507/// Execute `SHOW REGION` statement.
508pub async fn show_region(
509    stmt: ShowRegion,
510    query_engine: &QueryEngineRef,
511    catalog_manager: &CatalogManagerRef,
512    query_ctx: QueryContextRef,
513) -> Result<Output> {
514    let schema_name = if let Some(database) = stmt.database {
515        database
516    } else {
517        query_ctx.current_schema()
518    };
519
520    let filters = vec![
521        col(region_peers::TABLE_NAME).eq(lit(&stmt.table)),
522        col(region_peers::TABLE_SCHEMA).eq(lit(schema_name.clone())),
523        col(region_peers::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
524    ];
525    let projects = vec![
526        (region_peers::TABLE_NAME, "Table"),
527        (region_peers::REGION_ID, "Region"),
528        (region_peers::PEER_ID, "Peer"),
529        (region_peers::IS_LEADER, "Leader"),
530    ];
531
532    let like_field = None;
533    let sort = vec![
534        col(columns::REGION_ID).sort(true, true),
535        col(columns::PEER_ID).sort(true, true),
536    ];
537
538    query_from_information_schema_table(
539        query_engine,
540        catalog_manager,
541        query_ctx,
542        REGION_PEERS,
543        vec![],
544        projects,
545        filters,
546        like_field,
547        sort,
548        stmt.kind,
549    )
550    .await
551}
552
553/// Execute [`ShowTables`] statement and return the [`Output`] if success.
554pub async fn show_tables(
555    stmt: ShowTables,
556    query_engine: &QueryEngineRef,
557    catalog_manager: &CatalogManagerRef,
558    query_ctx: QueryContextRef,
559) -> Result<Output> {
560    let schema_name = if let Some(database) = stmt.database {
561        database
562    } else {
563        query_ctx.current_schema()
564    };
565
566    // MySQL renames `table_name` to `Tables_in_{schema}` for protocol compatibility
567    let tables_column = format!("Tables_in_{}", schema_name);
568    let projects = if stmt.full {
569        vec![
570            (tables::TABLE_NAME, tables_column.as_str()),
571            (tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
572        ]
573    } else {
574        vec![(tables::TABLE_NAME, tables_column.as_str())]
575    };
576    let filters = vec![
577        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
578        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
579    ];
580    let like_field = Some(tables::TABLE_NAME);
581    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
582
583    // Transform the WHERE clause for backward compatibility:
584    // Replace "Tables" with "Tables_in_{schema}" to support old queries
585    let kind = match stmt.kind {
586        ShowKind::Where(mut filter) => {
587            replace_column_in_expr(&mut filter, "Tables", &tables_column);
588            ShowKind::Where(filter)
589        }
590        other => other,
591    };
592
593    query_from_information_schema_table(
594        query_engine,
595        catalog_manager,
596        query_ctx,
597        TABLES,
598        vec![],
599        projects,
600        filters,
601        like_field,
602        sort,
603        kind,
604    )
605    .await
606}
607
608/// Execute [`ShowTableStatus`] statement and return the [`Output`] if success.
609pub async fn show_table_status(
610    stmt: ShowTableStatus,
611    query_engine: &QueryEngineRef,
612    catalog_manager: &CatalogManagerRef,
613    query_ctx: QueryContextRef,
614) -> Result<Output> {
615    let schema_name = if let Some(database) = stmt.database {
616        database
617    } else {
618        query_ctx.current_schema()
619    };
620
621    // Refer to https://dev.mysql.com/doc/refman/8.4/en/show-table-status.html
622    let projects = vec![
623        (tables::TABLE_NAME, "Name"),
624        (tables::ENGINE, "Engine"),
625        (tables::VERSION, "Version"),
626        (tables::ROW_FORMAT, "Row_format"),
627        (tables::TABLE_ROWS, "Rows"),
628        (tables::AVG_ROW_LENGTH, "Avg_row_length"),
629        (tables::DATA_LENGTH, "Data_length"),
630        (tables::MAX_DATA_LENGTH, "Max_data_length"),
631        (tables::INDEX_LENGTH, "Index_length"),
632        (tables::DATA_FREE, "Data_free"),
633        (tables::AUTO_INCREMENT, "Auto_increment"),
634        (tables::CREATE_TIME, "Create_time"),
635        (tables::UPDATE_TIME, "Update_time"),
636        (tables::CHECK_TIME, "Check_time"),
637        (tables::TABLE_COLLATION, "Collation"),
638        (tables::CHECKSUM, "Checksum"),
639        (tables::CREATE_OPTIONS, "Create_options"),
640        (tables::TABLE_COMMENT, "Comment"),
641    ];
642
643    let filters = vec![
644        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
645        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
646    ];
647    let like_field = Some(tables::TABLE_NAME);
648    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
649
650    query_from_information_schema_table(
651        query_engine,
652        catalog_manager,
653        query_ctx,
654        TABLES,
655        vec![],
656        projects,
657        filters,
658        like_field,
659        sort,
660        stmt.kind,
661    )
662    .await
663}
664
665/// Execute `SHOW COLLATION` statement and returns the `Output` if success.
666pub async fn show_collations(
667    kind: ShowKind,
668    query_engine: &QueryEngineRef,
669    catalog_manager: &CatalogManagerRef,
670    query_ctx: QueryContextRef,
671) -> Result<Output> {
672    // Refer to https://dev.mysql.com/doc/refman/8.0/en/show-collation.html
673    let projects = vec![
674        ("collation_name", "Collation"),
675        ("character_set_name", "Charset"),
676        ("id", "Id"),
677        ("is_default", "Default"),
678        ("is_compiled", "Compiled"),
679        ("sortlen", "Sortlen"),
680    ];
681
682    let filters = vec![];
683    let like_field = Some("collation_name");
684    let sort = vec![];
685
686    query_from_information_schema_table(
687        query_engine,
688        catalog_manager,
689        query_ctx,
690        COLLATIONS,
691        vec![],
692        projects,
693        filters,
694        like_field,
695        sort,
696        kind,
697    )
698    .await
699}
700
701/// Execute `SHOW CHARSET` statement and returns the `Output` if success.
702pub async fn show_charsets(
703    kind: ShowKind,
704    query_engine: &QueryEngineRef,
705    catalog_manager: &CatalogManagerRef,
706    query_ctx: QueryContextRef,
707) -> Result<Output> {
708    // Refer to https://dev.mysql.com/doc/refman/8.0/en/show-character-set.html
709    let projects = vec![
710        ("character_set_name", "Charset"),
711        ("description", "Description"),
712        ("default_collate_name", "Default collation"),
713        ("maxlen", "Maxlen"),
714    ];
715
716    let filters = vec![];
717    let like_field = Some("character_set_name");
718    let sort = vec![];
719
720    query_from_information_schema_table(
721        query_engine,
722        catalog_manager,
723        query_ctx,
724        CHARACTER_SETS,
725        vec![],
726        projects,
727        filters,
728        like_field,
729        sort,
730        kind,
731    )
732    .await
733}
734
735pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
736    let variable = stmt.variable.to_string().to_uppercase();
737    let value = match variable.as_str() {
738        "SYSTEM_TIME_ZONE" | "SYSTEM_TIMEZONE" => get_timezone(None).to_string(),
739        "TIME_ZONE" | "TIMEZONE" => query_ctx.timezone().to_string(),
740        "READ_PREFERENCE" => query_ctx.read_preference().to_string(),
741        "DATESTYLE" => {
742            let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
743            format!("{}, {}", style, order)
744        }
745        "INTERVALSTYLE" => {
746            let style = *query_ctx
747                .configuration_parameter()
748                .pg_intervalstyle_format();
749            style.to_string()
750        }
751        "MAX_EXECUTION_TIME" => {
752            if query_ctx.channel() == Channel::Mysql {
753                query_ctx.query_timeout_as_millis().to_string()
754            } else {
755                return UnsupportedVariableSnafu { name: variable }.fail();
756            }
757        }
758        "STATEMENT_TIMEOUT" => {
759            // Add time units to postgres query timeout display.
760            if query_ctx.channel() == Channel::Postgres {
761                let mut timeout = query_ctx.query_timeout_as_millis().to_string();
762                timeout.push_str("ms");
763                timeout
764            } else {
765                return UnsupportedVariableSnafu { name: variable }.fail();
766            }
767        }
768        _ => return UnsupportedVariableSnafu { name: variable }.fail(),
769    };
770    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
771        variable,
772        ConcreteDataType::string_datatype(),
773        false,
774    )]));
775    let records = RecordBatches::try_from_columns(
776        schema,
777        vec![Arc::new(StringVector::from(vec![value])) as _],
778    )
779    .context(error::CreateRecordBatchSnafu)?;
780    Ok(Output::new_with_record_batches(records))
781}
782
783pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
784    let schema = Arc::new(Schema::new(vec![
785        ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), false),
786        ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
787    ]));
788    let records = RecordBatches::try_from_columns(
789        schema,
790        vec![
791            Arc::new(StringVector::from(Vec::<&str>::new())) as _,
792            Arc::new(StringVector::from(Vec::<&str>::new())) as _,
793        ],
794    )
795    .context(error::CreateRecordBatchSnafu)?;
796    Ok(Output::new_with_record_batches(records))
797}
798
799pub async fn show_search_path(_query_ctx: QueryContextRef) -> Result<Output> {
800    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
801        "search_path",
802        ConcreteDataType::string_datatype(),
803        false,
804    )]));
805    let records = RecordBatches::try_from_columns(
806        schema,
807        vec![Arc::new(StringVector::from(vec![_query_ctx.current_schema()])) as _],
808    )
809    .context(error::CreateRecordBatchSnafu)?;
810    Ok(Output::new_with_record_batches(records))
811}
812
813pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
814    let stmt = CreateDatabase {
815        name: ObjectName::from(vec![Ident::new(database_name)]),
816        if_not_exists: true,
817        options,
818    };
819    let sql = format!("{stmt}");
820    let columns = vec![
821        Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
822        Arc::new(StringVector::from(vec![sql])) as _,
823    ];
824    let records =
825        RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
826            .context(error::CreateRecordBatchSnafu)?;
827    Ok(Output::new_with_record_batches(records))
828}
829
830pub fn show_create_table(
831    table_info: TableInfoRef,
832    schema_options: Option<SchemaOptions>,
833    partitions: Option<Partitions>,
834    query_ctx: QueryContextRef,
835) -> Result<Output> {
836    let table_name = table_info.name.clone();
837
838    let quote_style = query_ctx.quote_style();
839
840    let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
841    stmt.partitions = partitions.map(|mut p| {
842        p.set_quote(quote_style);
843        p
844    });
845    let sql = format!("{}", stmt);
846    let columns = vec![
847        Arc::new(StringVector::from(vec![table_name])) as _,
848        Arc::new(StringVector::from(vec![sql])) as _,
849    ];
850    let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
851        .context(error::CreateRecordBatchSnafu)?;
852
853    Ok(Output::new_with_record_batches(records))
854}
855
856pub fn show_create_foreign_table_for_pg(
857    table: TableRef,
858    _query_ctx: QueryContextRef,
859) -> Result<Output> {
860    let table_info = table.table_info();
861
862    let table_meta = &table_info.meta;
863    let table_name = &table_info.name;
864    let schema = &table_info.meta.schema;
865    let is_metric_engine = is_metric_engine(&table_meta.engine);
866
867    let columns = schema
868        .column_schemas()
869        .iter()
870        .filter_map(|c| {
871            if is_metric_engine && is_metric_engine_internal_column(&c.name) {
872                None
873            } else {
874                Some(format!(
875                    "\"{}\" {}",
876                    c.name,
877                    c.data_type.postgres_datatype_name()
878                ))
879            }
880        })
881        .join(",\n  ");
882
883    let sql = format!(
884        r#"CREATE FOREIGN TABLE ft_{} (
885  {}
886)
887SERVER greptimedb
888OPTIONS (table_name '{}')"#,
889        table_name, columns, table_name
890    );
891
892    let columns = vec![
893        Arc::new(StringVector::from(vec![table_name.clone()])) as _,
894        Arc::new(StringVector::from(vec![sql])) as _,
895    ];
896    let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
897        .context(error::CreateRecordBatchSnafu)?;
898
899    Ok(Output::new_with_record_batches(records))
900}
901
902pub fn show_create_view(
903    view_name: ObjectName,
904    definition: &str,
905    query_ctx: QueryContextRef,
906) -> Result<Output> {
907    let mut parser_ctx =
908        ParserContext::new(query_ctx.sql_dialect(), definition).context(error::SqlSnafu)?;
909
910    let Statement::CreateView(create_view) =
911        parser_ctx.parse_statement().context(error::SqlSnafu)?
912    else {
913        // MUST be `CreateView` statement.
914        unreachable!();
915    };
916
917    let stmt = CreateView {
918        name: view_name.clone(),
919        columns: create_view.columns,
920        query: create_view.query,
921        or_replace: create_view.or_replace,
922        if_not_exists: create_view.if_not_exists,
923    };
924
925    let sql = format!("{}", stmt);
926    let columns = vec![
927        Arc::new(StringVector::from(vec![view_name.to_string()])) as _,
928        Arc::new(StringVector::from(vec![sql])) as _,
929    ];
930    let records = RecordBatches::try_from_columns(SHOW_CREATE_VIEW_OUTPUT_SCHEMA.clone(), columns)
931        .context(error::CreateRecordBatchSnafu)?;
932
933    Ok(Output::new_with_record_batches(records))
934}
935
936/// Execute [`ShowViews`] statement and return the [`Output`] if success.
937pub async fn show_views(
938    stmt: ShowViews,
939    query_engine: &QueryEngineRef,
940    catalog_manager: &CatalogManagerRef,
941    query_ctx: QueryContextRef,
942) -> Result<Output> {
943    let schema_name = if let Some(database) = stmt.database {
944        database
945    } else {
946        query_ctx.current_schema()
947    };
948
949    let projects = vec![(tables::TABLE_NAME, VIEWS_COLUMN)];
950    let filters = vec![
951        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
952        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
953    ];
954    let like_field = Some(tables::TABLE_NAME);
955    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
956
957    query_from_information_schema_table(
958        query_engine,
959        catalog_manager,
960        query_ctx,
961        VIEWS,
962        vec![],
963        projects,
964        filters,
965        like_field,
966        sort,
967        stmt.kind,
968    )
969    .await
970}
971
972/// Execute [`ShowFlows`] statement and return the [`Output`] if success.
973pub async fn show_flows(
974    stmt: ShowFlows,
975    query_engine: &QueryEngineRef,
976    catalog_manager: &CatalogManagerRef,
977    query_ctx: QueryContextRef,
978) -> Result<Output> {
979    let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
980    let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
981    let like_field = Some(flows::FLOW_NAME);
982    let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
983
984    query_from_information_schema_table(
985        query_engine,
986        catalog_manager,
987        query_ctx,
988        FLOWS,
989        vec![],
990        projects,
991        filters,
992        like_field,
993        sort,
994        stmt.kind,
995    )
996    .await
997}
998
999#[cfg(feature = "enterprise")]
1000pub async fn show_triggers(
1001    stmt: sql::statements::show::trigger::ShowTriggers,
1002    query_engine: &QueryEngineRef,
1003    catalog_manager: &CatalogManagerRef,
1004    query_ctx: QueryContextRef,
1005) -> Result<Output> {
1006    const TRIGGER_NAME: &str = "trigger_name";
1007    const TRIGGERS_COLUMN: &str = "Triggers";
1008
1009    let projects = vec![(TRIGGER_NAME, TRIGGERS_COLUMN)];
1010    let like_field = Some(TRIGGER_NAME);
1011    let sort = vec![col(TRIGGER_NAME).sort(true, true)];
1012
1013    query_from_information_schema_table(
1014        query_engine,
1015        catalog_manager,
1016        query_ctx,
1017        catalog::information_schema::TRIGGERS,
1018        vec![],
1019        projects,
1020        vec![],
1021        like_field,
1022        sort,
1023        stmt.kind,
1024    )
1025    .await
1026}
1027
1028pub fn show_create_flow(
1029    flow_name: ObjectName,
1030    flow_val: FlowInfoValue,
1031    query_ctx: QueryContextRef,
1032) -> Result<Output> {
1033    let mut parser_ctx =
1034        ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
1035
1036    let query = parser_ctx.parse_statement().context(error::SqlSnafu)?;
1037
1038    // since prom ql will parse `now()` to a fixed time, we need to not use it for generating raw query
1039    let raw_query = match &query {
1040        Statement::Tql(_) => flow_val.raw_sql().clone(),
1041        _ => query.to_string(),
1042    };
1043
1044    let query = Box::new(SqlOrTql::try_from_statement(query, &raw_query).context(error::SqlSnafu)?);
1045
1046    let comment = if flow_val.comment().is_empty() {
1047        None
1048    } else {
1049        Some(flow_val.comment().clone())
1050    };
1051
1052    let stmt = CreateFlow {
1053        flow_name,
1054        sink_table_name: ObjectName::from(vec![Ident::new(&flow_val.sink_table_name().table_name)]),
1055        // notice we don't want `OR REPLACE` and `IF NOT EXISTS` in same sql since it's unclear what to do
1056        // so we set `or_replace` to false.
1057        or_replace: false,
1058        if_not_exists: true,
1059        expire_after: flow_val.expire_after(),
1060        eval_interval: flow_val.eval_interval(),
1061        comment,
1062        query,
1063    };
1064
1065    let sql = format!("{}", stmt);
1066    let columns = vec![
1067        Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
1068        Arc::new(StringVector::from(vec![sql])) as _,
1069    ];
1070    let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
1071        .context(error::CreateRecordBatchSnafu)?;
1072
1073    Ok(Output::new_with_record_batches(records))
1074}
1075
1076pub fn describe_table(table: TableRef) -> Result<Output> {
1077    let table_info = table.table_info();
1078    let columns_schemas = table_info.meta.schema.column_schemas();
1079    let columns = vec![
1080        describe_column_names(columns_schemas),
1081        describe_column_types(columns_schemas),
1082        describe_column_keys(columns_schemas, &table_info.meta.primary_key_indices),
1083        describe_column_nullables(columns_schemas),
1084        describe_column_defaults(columns_schemas),
1085        describe_column_semantic_types(columns_schemas, &table_info.meta.primary_key_indices),
1086    ];
1087    let records = RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), columns)
1088        .context(error::CreateRecordBatchSnafu)?;
1089    Ok(Output::new_with_record_batches(records))
1090}
1091
1092fn describe_column_names(columns_schemas: &[ColumnSchema]) -> VectorRef {
1093    Arc::new(StringVector::from_iterator(
1094        columns_schemas.iter().map(|cs| cs.name.as_str()),
1095    ))
1096}
1097
1098fn describe_column_types(columns_schemas: &[ColumnSchema]) -> VectorRef {
1099    Arc::new(StringVector::from(
1100        columns_schemas
1101            .iter()
1102            .map(|cs| cs.data_type.name())
1103            .collect::<Vec<_>>(),
1104    ))
1105}
1106
1107fn describe_column_keys(
1108    columns_schemas: &[ColumnSchema],
1109    primary_key_indices: &[usize],
1110) -> VectorRef {
1111    Arc::new(StringVector::from_iterator(
1112        columns_schemas.iter().enumerate().map(|(i, cs)| {
1113            if cs.is_time_index() || primary_key_indices.contains(&i) {
1114                PRI_KEY
1115            } else {
1116                ""
1117            }
1118        }),
1119    ))
1120}
1121
1122fn describe_column_nullables(columns_schemas: &[ColumnSchema]) -> VectorRef {
1123    Arc::new(StringVector::from_iterator(columns_schemas.iter().map(
1124        |cs| {
1125            if cs.is_nullable() { YES_STR } else { NO_STR }
1126        },
1127    )))
1128}
1129
1130fn describe_column_defaults(columns_schemas: &[ColumnSchema]) -> VectorRef {
1131    Arc::new(StringVector::from(
1132        columns_schemas
1133            .iter()
1134            .map(|cs| {
1135                cs.default_constraint()
1136                    .map_or(String::from(""), |dc| dc.to_string())
1137            })
1138            .collect::<Vec<String>>(),
1139    ))
1140}
1141
1142fn describe_column_semantic_types(
1143    columns_schemas: &[ColumnSchema],
1144    primary_key_indices: &[usize],
1145) -> VectorRef {
1146    Arc::new(StringVector::from_iterator(
1147        columns_schemas.iter().enumerate().map(|(i, cs)| {
1148            if primary_key_indices.contains(&i) {
1149                SEMANTIC_TYPE_PRIMARY_KEY
1150            } else if cs.is_time_index() {
1151                SEMANTIC_TYPE_TIME_INDEX
1152            } else {
1153                SEMANTIC_TYPE_FIELD
1154            }
1155        }),
1156    ))
1157}
1158
1159// lists files in the frontend to reduce unnecessary scan requests repeated in each datanode.
1160pub async fn prepare_file_table_files(
1161    options: &HashMap<String, String>,
1162) -> Result<(ObjectStore, Vec<String>)> {
1163    let url = options
1164        .get(FILE_TABLE_LOCATION_KEY)
1165        .context(error::MissingRequiredFieldSnafu {
1166            name: FILE_TABLE_LOCATION_KEY,
1167        })?;
1168
1169    let (dir, filename) = find_dir_and_filename(url);
1170    let source = if let Some(filename) = filename {
1171        Source::Filename(filename)
1172    } else {
1173        Source::Dir
1174    };
1175    let regex = options
1176        .get(FILE_TABLE_PATTERN_KEY)
1177        .map(|x| Regex::new(x))
1178        .transpose()
1179        .context(error::BuildRegexSnafu)?;
1180    let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
1181    let lister = Lister::new(object_store.clone(), source, dir, regex);
1182    // If we scan files in a directory every time the database restarts,
1183    // then it might lead to a potential undefined behavior:
1184    // If a user adds a file with an incompatible schema to that directory,
1185    // it will make the external table unavailable.
1186    let files = lister
1187        .list()
1188        .await
1189        .context(error::ListObjectsSnafu)?
1190        .into_iter()
1191        .filter_map(|entry| {
1192            if entry.path().ends_with('/') {
1193                None
1194            } else {
1195                Some(entry.path().to_string())
1196            }
1197        })
1198        .collect::<Vec<_>>();
1199    Ok((object_store, files))
1200}
1201
1202pub async fn infer_file_table_schema(
1203    object_store: &ObjectStore,
1204    files: &[String],
1205    options: &HashMap<String, String>,
1206) -> Result<RawSchema> {
1207    let format = parse_file_table_format(options)?;
1208    let merged = infer_schemas(object_store, files, format.as_ref())
1209        .await
1210        .context(error::InferSchemaSnafu)?;
1211    Ok(RawSchema::from(
1212        &Schema::try_from(merged).context(error::ConvertSchemaSnafu)?,
1213    ))
1214}
1215
1216// Converts the file column schemas to table column schemas.
1217// Returns the column schemas and the time index column name.
1218//
1219// More specifically, this function will do the following:
1220// 1. Add a default time index column if there is no time index column
1221//    in the file column schemas, or
1222// 2. If the file column schemas contain a column with name conflicts with
1223//    the default time index column, it will replace the column schema
1224//    with the default one.
1225pub fn file_column_schemas_to_table(
1226    file_column_schemas: &[ColumnSchema],
1227) -> (Vec<ColumnSchema>, String) {
1228    let mut column_schemas = file_column_schemas.to_owned();
1229    if let Some(time_index_column) = column_schemas.iter().find(|c| c.is_time_index()) {
1230        let time_index = time_index_column.name.clone();
1231        return (column_schemas, time_index);
1232    }
1233
1234    let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
1235    let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
1236    let timestamp_column_schema = ColumnSchema::new(greptime_timestamp(), timestamp_type, false)
1237        .with_time_index(true)
1238        .with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
1239        .unwrap();
1240
1241    if let Some(column_schema) = column_schemas
1242        .iter_mut()
1243        .find(|column_schema| column_schema.name == greptime_timestamp())
1244    {
1245        // Replace the column schema with the default one
1246        *column_schema = timestamp_column_schema;
1247    } else {
1248        column_schemas.push(timestamp_column_schema);
1249    }
1250
1251    (column_schemas, greptime_timestamp().to_string())
1252}
1253
1254/// This function checks if the column schemas from a file can be matched with
1255/// the column schemas of a table.
1256///
1257/// More specifically, for each column seen in the table schema,
1258/// - If the same column does exist in the file schema, it checks if the data
1259///   type of the file column can be casted into the form of the table column.
1260/// - If the same column does not exist in the file schema, it checks if the
1261///   table column is nullable or has a default constraint.
1262pub fn check_file_to_table_schema_compatibility(
1263    file_column_schemas: &[ColumnSchema],
1264    table_column_schemas: &[ColumnSchema],
1265) -> Result<()> {
1266    let file_schemas_map = file_column_schemas
1267        .iter()
1268        .map(|s| (s.name.clone(), s))
1269        .collect::<HashMap<_, _>>();
1270
1271    for table_column in table_column_schemas {
1272        if let Some(file_column) = file_schemas_map.get(&table_column.name) {
1273            // TODO(zhongzc): a temporary solution, we should use `can_cast_to` once it's ready.
1274            ensure!(
1275                file_column
1276                    .data_type
1277                    .can_arrow_type_cast_to(&table_column.data_type),
1278                error::ColumnSchemaIncompatibleSnafu {
1279                    column: table_column.name.clone(),
1280                    file_type: file_column.data_type.clone(),
1281                    table_type: table_column.data_type.clone(),
1282                }
1283            );
1284        } else {
1285            ensure!(
1286                table_column.is_nullable() || table_column.default_constraint().is_some(),
1287                error::ColumnSchemaNoDefaultSnafu {
1288                    column: table_column.name.clone(),
1289                }
1290            );
1291        }
1292    }
1293
1294    Ok(())
1295}
1296
1297fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn FileFormat>> {
1298    Ok(
1299        match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
1300            Format::Csv(format) => Box::new(format),
1301            Format::Json(format) => Box::new(format),
1302            Format::Parquet(format) => Box::new(format),
1303            Format::Orc(format) => Box::new(format),
1304        },
1305    )
1306}
1307
1308pub async fn show_processlist(
1309    stmt: ShowProcessList,
1310    query_engine: &QueryEngineRef,
1311    catalog_manager: &CatalogManagerRef,
1312    query_ctx: QueryContextRef,
1313) -> Result<Output> {
1314    let projects = if stmt.full {
1315        vec![
1316            (process_list::ID, "Id"),
1317            (process_list::CATALOG, "Catalog"),
1318            (process_list::SCHEMAS, "Schema"),
1319            (process_list::CLIENT, "Client"),
1320            (process_list::FRONTEND, "Frontend"),
1321            (process_list::START_TIMESTAMP, "Start Time"),
1322            (process_list::ELAPSED_TIME, "Elapsed Time"),
1323            (process_list::QUERY, "Query"),
1324        ]
1325    } else {
1326        vec![
1327            (process_list::ID, "Id"),
1328            (process_list::CATALOG, "Catalog"),
1329            (process_list::QUERY, "Query"),
1330            (process_list::ELAPSED_TIME, "Elapsed Time"),
1331        ]
1332    };
1333
1334    let filters = vec![];
1335    let like_field = None;
1336    let sort = vec![col("id").sort(true, true)];
1337    query_from_information_schema_table(
1338        query_engine,
1339        catalog_manager,
1340        query_ctx.clone(),
1341        "process_list",
1342        vec![],
1343        projects.clone(),
1344        filters,
1345        like_field,
1346        sort,
1347        ShowKind::All,
1348    )
1349    .await
1350}
1351
1352#[cfg(test)]
1353mod test {
1354    use std::sync::Arc;
1355
1356    use common_query::{Output, OutputData};
1357    use common_recordbatch::{RecordBatch, RecordBatches};
1358    use common_time::Timezone;
1359    use common_time::timestamp::TimeUnit;
1360    use datatypes::prelude::ConcreteDataType;
1361    use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
1362    use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
1363    use session::context::QueryContextBuilder;
1364    use snafu::ResultExt;
1365    use sql::ast::{Ident, ObjectName};
1366    use sql::statements::show::ShowVariables;
1367    use table::TableRef;
1368    use table::test_util::MemTable;
1369
1370    use super::show_variable;
1371    use crate::error;
1372    use crate::error::Result;
1373    use crate::sql::{
1374        DESCRIBE_TABLE_OUTPUT_SCHEMA, NO_STR, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_TIME_INDEX,
1375        YES_STR, describe_table,
1376    };
1377
1378    #[test]
1379    fn test_describe_table_multiple_columns() -> Result<()> {
1380        let table_name = "test_table";
1381        let schema = vec![
1382            ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
1383            ColumnSchema::new(
1384                "t2",
1385                ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
1386                false,
1387            )
1388            .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
1389                "current_timestamp()",
1390            ))))
1391            .unwrap()
1392            .with_time_index(true),
1393        ];
1394        let data = vec![
1395            Arc::new(UInt32Vector::from_slice([0])) as _,
1396            Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
1397        ];
1398        let expected_columns = vec![
1399            Arc::new(StringVector::from(vec!["t1", "t2"])) as _,
1400            Arc::new(StringVector::from(vec!["UInt32", "TimestampMillisecond"])) as _,
1401            Arc::new(StringVector::from(vec!["", "PRI"])) as _,
1402            Arc::new(StringVector::from(vec![YES_STR, NO_STR])) as _,
1403            Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
1404            Arc::new(StringVector::from(vec![
1405                SEMANTIC_TYPE_FIELD,
1406                SEMANTIC_TYPE_TIME_INDEX,
1407            ])) as _,
1408        ];
1409
1410        describe_table_test_by_schema(table_name, schema, data, expected_columns)
1411    }
1412
1413    fn describe_table_test_by_schema(
1414        table_name: &str,
1415        schema: Vec<ColumnSchema>,
1416        data: Vec<VectorRef>,
1417        expected_columns: Vec<VectorRef>,
1418    ) -> Result<()> {
1419        let table_schema = SchemaRef::new(Schema::new(schema));
1420        let table = prepare_describe_table(table_name, table_schema, data);
1421
1422        let expected =
1423            RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
1424                .context(error::CreateRecordBatchSnafu)?;
1425
1426        if let OutputData::RecordBatches(res) = describe_table(table)?.data {
1427            assert_eq!(res.take(), expected.take());
1428        } else {
1429            panic!("describe table must return record batch");
1430        }
1431
1432        Ok(())
1433    }
1434
1435    fn prepare_describe_table(
1436        table_name: &str,
1437        table_schema: SchemaRef,
1438        data: Vec<VectorRef>,
1439    ) -> TableRef {
1440        let record_batch = RecordBatch::new(table_schema, data).unwrap();
1441        MemTable::table(table_name, record_batch)
1442    }
1443
1444    #[test]
1445    fn test_show_variable() {
1446        assert_eq!(
1447            exec_show_variable("SYSTEM_TIME_ZONE", "Asia/Shanghai").unwrap(),
1448            "UTC"
1449        );
1450        assert_eq!(
1451            exec_show_variable("SYSTEM_TIMEZONE", "Asia/Shanghai").unwrap(),
1452            "UTC"
1453        );
1454        assert_eq!(
1455            exec_show_variable("TIME_ZONE", "Asia/Shanghai").unwrap(),
1456            "Asia/Shanghai"
1457        );
1458        assert_eq!(
1459            exec_show_variable("TIMEZONE", "Asia/Shanghai").unwrap(),
1460            "Asia/Shanghai"
1461        );
1462        assert!(exec_show_variable("TIME ZONE", "Asia/Shanghai").is_err());
1463        assert!(exec_show_variable("SYSTEM TIME ZONE", "Asia/Shanghai").is_err());
1464    }
1465
1466    fn exec_show_variable(variable: &str, tz: &str) -> Result<String> {
1467        let stmt = ShowVariables {
1468            variable: ObjectName::from(vec![Ident::new(variable)]),
1469        };
1470        let ctx = Arc::new(
1471            QueryContextBuilder::default()
1472                .timezone(Timezone::from_tz_string(tz).unwrap())
1473                .build(),
1474        );
1475        match show_variable(stmt, ctx) {
1476            Ok(Output {
1477                data: OutputData::RecordBatches(record),
1478                ..
1479            }) => {
1480                let record = record.take().first().cloned().unwrap();
1481                Ok(record.iter_column_as_string(0).next().unwrap().unwrap())
1482            }
1483            Ok(_) => unreachable!(),
1484            Err(e) => Err(e),
1485        }
1486    }
1487}