Skip to main content

query/
sql.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod show_create_table;
16
17use std::collections::HashMap;
18use std::ops::ControlFlow;
19use std::sync::Arc;
20
21use catalog::CatalogManagerRef;
22use catalog::information_schema::{
23    CHARACTER_SETS, COLLATIONS, COLUMNS, FLOWS, REGION_PEERS, SCHEMATA, STATISTICS, TABLES, VIEWS,
24    columns, flows, process_list, region_peers, schemata, statistics, tables,
25};
26use common_catalog::consts::{
27    INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
28    SEMANTIC_TYPE_TIME_INDEX,
29};
30use common_catalog::format_full_table_name;
31use common_datasource::file_format::{FileFormat, Format, infer_schemas};
32use common_datasource::lister::{Lister, Source};
33use common_datasource::object_store::build_backend;
34use common_datasource::util::find_dir_and_filename;
35use common_meta::SchemaOptions;
36use common_meta::ddl::create_flow::FlowType;
37use common_meta::key::flow::flow_info::FlowInfoValue;
38use common_query::Output;
39use common_query::prelude::greptime_timestamp;
40use common_recordbatch::RecordBatches;
41use common_recordbatch::adapter::RecordBatchStreamAdapter;
42use common_time::Timestamp;
43use common_time::timezone::get_timezone;
44use datafusion::prelude::SessionContext;
45use datafusion_expr::{Expr, SortExpr, col, lit};
46use datatypes::prelude::*;
47use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema};
48use datatypes::vectors::StringVector;
49use itertools::Itertools;
50use object_store::ObjectStore;
51use once_cell::sync::Lazy;
52use regex::Regex;
53use session::context::{Channel, QueryContextRef};
54pub use show_create_table::create_table_stmt;
55use snafu::{OptionExt, ResultExt, ensure};
56use sql::ast::{Ident, visit_expressions_mut};
57use sql::parser::ParserContext;
58use sql::statements::OptionMap;
59use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions, SqlOrTql};
60use sql::statements::show::{
61    ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowProcessList, ShowRegion,
62    ShowTableStatus, ShowTables, ShowVariables, ShowViews,
63};
64use sql::statements::statement::Statement;
65use sqlparser::ast::ObjectName;
66use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
67use table::TableRef;
68use table::metadata::TableInfoRef;
69use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
70
71use crate::QueryEngineRef;
72use crate::error::{self, Result, UnsupportedVariableSnafu};
73use crate::planner::DfLogicalPlanner;
74
75const SCHEMAS_COLUMN: &str = "Database";
76const OPTIONS_COLUMN: &str = "Options";
77const VIEWS_COLUMN: &str = "Views";
78const FLOWS_COLUMN: &str = "Flows";
79const FIELD_COLUMN: &str = "Field";
80const TABLE_TYPE_COLUMN: &str = "Table_type";
81
82const COLUMN_NAME_COLUMN: &str = "Column";
83const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
84const COLUMN_TYPE_COLUMN: &str = "Type";
85const COLUMN_KEY_COLUMN: &str = "Key";
86const COLUMN_EXTRA_COLUMN: &str = "Extra";
87const COLUMN_PRIVILEGES_COLUMN: &str = "Privileges";
88const COLUMN_COLLATION_COLUMN: &str = "Collation";
89const COLUMN_NULLABLE_COLUMN: &str = "Null";
90const COLUMN_DEFAULT_COLUMN: &str = "Default";
91const COLUMN_COMMENT_COLUMN: &str = "Comment";
92const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
93
94const YES_STR: &str = "YES";
95const NO_STR: &str = "NO";
96const PRI_KEY: &str = "PRI";
97
98/// SHOW index columns
99const INDEX_TABLE_COLUMN: &str = "Table";
100const INDEX_NONT_UNIQUE_COLUMN: &str = "Non_unique";
101const INDEX_CARDINALITY_COLUMN: &str = "Cardinality";
102const INDEX_SUB_PART_COLUMN: &str = "Sub_part";
103const INDEX_PACKED_COLUMN: &str = "Packed";
104const INDEX_INDEX_TYPE_COLUMN: &str = "Index_type";
105const INDEX_COMMENT_COLUMN: &str = "Index_comment";
106const INDEX_VISIBLE_COLUMN: &str = "Visible";
107const INDEX_EXPRESSION_COLUMN: &str = "Expression";
108const INDEX_KEY_NAME_COLUMN: &str = "Key_name";
109const INDEX_SEQ_IN_INDEX_COLUMN: &str = "Seq_in_index";
110const INDEX_COLUMN_NAME_COLUMN: &str = "Column_name";
111
112static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
113    Arc::new(Schema::new(vec![
114        ColumnSchema::new(
115            COLUMN_NAME_COLUMN,
116            ConcreteDataType::string_datatype(),
117            false,
118        ),
119        ColumnSchema::new(
120            COLUMN_TYPE_COLUMN,
121            ConcreteDataType::string_datatype(),
122            false,
123        ),
124        ColumnSchema::new(COLUMN_KEY_COLUMN, ConcreteDataType::string_datatype(), true),
125        ColumnSchema::new(
126            COLUMN_NULLABLE_COLUMN,
127            ConcreteDataType::string_datatype(),
128            false,
129        ),
130        ColumnSchema::new(
131            COLUMN_DEFAULT_COLUMN,
132            ConcreteDataType::string_datatype(),
133            false,
134        ),
135        ColumnSchema::new(
136            COLUMN_SEMANTIC_TYPE_COLUMN,
137            ConcreteDataType::string_datatype(),
138            false,
139        ),
140    ]))
141});
142
143static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
144    Arc::new(Schema::new(vec![
145        ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
146        ColumnSchema::new(
147            "Create Database",
148            ConcreteDataType::string_datatype(),
149            false,
150        ),
151    ]))
152});
153
154static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
155    Arc::new(Schema::new(vec![
156        ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
157        ColumnSchema::new("Create Table", ConcreteDataType::string_datatype(), false),
158    ]))
159});
160
161static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
162    Arc::new(Schema::new(vec![
163        ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
164        ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
165    ]))
166});
167
168static SHOW_CREATE_VIEW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
169    Arc::new(Schema::new(vec![
170        ColumnSchema::new("View", ConcreteDataType::string_datatype(), false),
171        ColumnSchema::new("Create View", ConcreteDataType::string_datatype(), false),
172    ]))
173});
174
175pub async fn show_databases(
176    stmt: ShowDatabases,
177    query_engine: &QueryEngineRef,
178    catalog_manager: &CatalogManagerRef,
179    query_ctx: QueryContextRef,
180) -> Result<Output> {
181    let projects = if stmt.full {
182        vec![
183            (schemata::SCHEMA_NAME, SCHEMAS_COLUMN),
184            (schemata::SCHEMA_OPTS, OPTIONS_COLUMN),
185        ]
186    } else {
187        vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)]
188    };
189
190    let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
191    let like_field = Some(schemata::SCHEMA_NAME);
192    let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
193
194    query_from_information_schema_table(
195        query_engine,
196        catalog_manager,
197        query_ctx,
198        SCHEMATA,
199        vec![],
200        projects,
201        filters,
202        like_field,
203        sort,
204        stmt.kind,
205    )
206    .await
207}
208
209/// Replaces column identifier references in a SQL expression.
210/// Used for backward compatibility where old column names should work with new ones.
211fn replace_column_in_expr(expr: &mut sqlparser::ast::Expr, from_column: &str, to_column: &str) {
212    let _ = visit_expressions_mut(expr, |e| {
213        match e {
214            sqlparser::ast::Expr::Identifier(ident)
215                if ident.value.eq_ignore_ascii_case(from_column) =>
216            {
217                ident.value = to_column.to_string();
218            }
219            sqlparser::ast::Expr::CompoundIdentifier(idents) => {
220                if let Some(last) = idents.last_mut()
221                    && last.value.eq_ignore_ascii_case(from_column)
222                {
223                    last.value = to_column.to_string();
224                }
225            }
226            _ => {}
227        }
228        ControlFlow::<()>::Continue(())
229    });
230}
231
232/// Cast a `show` statement execution into a query from tables in  `information_schema`.
233/// - `table_name`: the table name in `information_schema`,
234/// - `projects`: query projection, a list of `(column, renamed_column)`,
235/// - `filters`: filter expressions for query,
236/// - `like_field`: the field to filter by the predicate `ShowKind::Like`,
237/// - `sort`: sort the results by the specified sorting expressions,
238/// - `kind`: the show kind
239#[allow(clippy::too_many_arguments)]
240async fn query_from_information_schema_table(
241    query_engine: &QueryEngineRef,
242    catalog_manager: &CatalogManagerRef,
243    query_ctx: QueryContextRef,
244    table_name: &str,
245    select: Vec<Expr>,
246    projects: Vec<(&str, &str)>,
247    filters: Vec<Expr>,
248    like_field: Option<&str>,
249    sort: Vec<SortExpr>,
250    kind: ShowKind,
251) -> Result<Output> {
252    let table = catalog_manager
253        .table(
254            query_ctx.current_catalog(),
255            INFORMATION_SCHEMA_NAME,
256            table_name,
257            Some(&query_ctx),
258        )
259        .await
260        .context(error::CatalogSnafu)?
261        .with_context(|| error::TableNotFoundSnafu {
262            table: format_full_table_name(
263                query_ctx.current_catalog(),
264                INFORMATION_SCHEMA_NAME,
265                table_name,
266            ),
267        })?;
268
269    let dataframe = query_engine.read_table(table)?;
270
271    // Apply filters
272    let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
273        df.filter(expr).context(error::PlanSqlSnafu)
274    })?;
275
276    // Apply `like` predicate if exists
277    let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
278        dataframe
279            .filter(col(field).like(lit(ident.value.clone())))
280            .context(error::PlanSqlSnafu)?
281    } else {
282        dataframe
283    };
284
285    // Apply sorting
286    let dataframe = if sort.is_empty() {
287        dataframe
288    } else {
289        dataframe.sort(sort).context(error::PlanSqlSnafu)?
290    };
291
292    // Apply select
293    let dataframe = if select.is_empty() {
294        if projects.is_empty() {
295            dataframe
296        } else {
297            let projection = projects
298                .iter()
299                .map(|x| col(x.0).alias(x.1))
300                .collect::<Vec<_>>();
301            dataframe.select(projection).context(error::PlanSqlSnafu)?
302        }
303    } else {
304        dataframe.select(select).context(error::PlanSqlSnafu)?
305    };
306
307    // Apply projection
308    let dataframe = projects
309        .into_iter()
310        .try_fold(dataframe, |df, (column, renamed_column)| {
311            df.with_column_renamed(column, renamed_column)
312                .context(error::PlanSqlSnafu)
313        })?;
314
315    let dataframe = match kind {
316        ShowKind::All | ShowKind::Like(_) => {
317            // Like kind is processed above
318            dataframe
319        }
320        ShowKind::Where(filter) => {
321            // Cast the results into VIEW for `where` clause,
322            // which is evaluated against the column names displayed by the SHOW statement.
323            let view = dataframe.into_view();
324            let dataframe = SessionContext::new_with_state(
325                query_engine
326                    .engine_context(query_ctx.clone())
327                    .state()
328                    .clone(),
329            )
330            .read_table(view)?;
331
332            let planner = query_engine.planner();
333            let planner = planner
334                .as_any()
335                .downcast_ref::<DfLogicalPlanner>()
336                .expect("Must be the datafusion planner");
337
338            let filter = planner
339                .sql_to_expr(filter, dataframe.schema(), false, query_ctx)
340                .await?;
341
342            // Apply the `where` clause filters
343            dataframe.filter(filter).context(error::PlanSqlSnafu)?
344        }
345    };
346
347    let stream = dataframe.execute_stream().await?;
348
349    Ok(Output::new_with_stream(Box::pin(
350        RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
351    )))
352}
353
354/// Execute `SHOW COLUMNS` statement.
355pub async fn show_columns(
356    stmt: ShowColumns,
357    query_engine: &QueryEngineRef,
358    catalog_manager: &CatalogManagerRef,
359    query_ctx: QueryContextRef,
360) -> Result<Output> {
361    let schema_name = if let Some(database) = stmt.database {
362        database
363    } else {
364        query_ctx.current_schema()
365    };
366
367    let projects = if stmt.full {
368        vec![
369            (columns::COLUMN_NAME, FIELD_COLUMN),
370            (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
371            (columns::COLLATION_NAME, COLUMN_COLLATION_COLUMN),
372            (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
373            (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
374            (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
375            (columns::COLUMN_COMMENT, COLUMN_COMMENT_COLUMN),
376            (columns::PRIVILEGES, COLUMN_PRIVILEGES_COLUMN),
377            (columns::EXTRA, COLUMN_EXTRA_COLUMN),
378            (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
379        ]
380    } else {
381        vec![
382            (columns::COLUMN_NAME, FIELD_COLUMN),
383            (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
384            (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
385            (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
386            (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
387            (columns::EXTRA, COLUMN_EXTRA_COLUMN),
388            (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
389        ]
390    };
391
392    let filters = vec![
393        col(columns::TABLE_NAME).eq(lit(&stmt.table)),
394        col(columns::TABLE_SCHEMA).eq(lit(schema_name.clone())),
395        col(columns::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
396    ];
397    let like_field = Some(columns::COLUMN_NAME);
398    let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
399
400    query_from_information_schema_table(
401        query_engine,
402        catalog_manager,
403        query_ctx,
404        COLUMNS,
405        vec![],
406        projects,
407        filters,
408        like_field,
409        sort,
410        stmt.kind,
411    )
412    .await
413}
414
415/// Execute `SHOW INDEX` statement.
416pub async fn show_index(
417    stmt: ShowIndex,
418    query_engine: &QueryEngineRef,
419    catalog_manager: &CatalogManagerRef,
420    query_ctx: QueryContextRef,
421) -> Result<Output> {
422    let schema_name = if let Some(database) = stmt.database {
423        database
424    } else {
425        query_ctx.current_schema()
426    };
427
428    let select = vec![
429        col(statistics::TABLE_NAME).alias(INDEX_TABLE_COLUMN),
430        col(statistics::NON_UNIQUE).alias(INDEX_NONT_UNIQUE_COLUMN),
431        col(statistics::INDEX_NAME).alias(INDEX_KEY_NAME_COLUMN),
432        col(statistics::SEQ_IN_INDEX).alias(INDEX_SEQ_IN_INDEX_COLUMN),
433        col(statistics::COLUMN_NAME).alias(INDEX_COLUMN_NAME_COLUMN),
434        col(statistics::COLLATION).alias(COLUMN_COLLATION_COLUMN),
435        col(statistics::CARDINALITY).alias(INDEX_CARDINALITY_COLUMN),
436        col(statistics::SUB_PART).alias(INDEX_SUB_PART_COLUMN),
437        col(statistics::PACKED).alias(INDEX_PACKED_COLUMN),
438        col(statistics::NULLABLE).alias(COLUMN_NULLABLE_COLUMN),
439        col(statistics::INDEX_TYPE).alias(INDEX_INDEX_TYPE_COLUMN),
440        col(statistics::COMMENT).alias(COLUMN_COMMENT_COLUMN),
441        col(statistics::INDEX_COMMENT).alias(INDEX_COMMENT_COLUMN),
442        col(statistics::IS_VISIBLE).alias(INDEX_VISIBLE_COLUMN),
443        col(statistics::EXPRESSION).alias(INDEX_EXPRESSION_COLUMN),
444    ];
445
446    let projects = vec![
447        (statistics::TABLE_NAME, INDEX_TABLE_COLUMN),
448        (INDEX_NONT_UNIQUE_COLUMN, INDEX_NONT_UNIQUE_COLUMN),
449        (statistics::INDEX_NAME, INDEX_KEY_NAME_COLUMN),
450        (statistics::SEQ_IN_INDEX, INDEX_SEQ_IN_INDEX_COLUMN),
451        (statistics::COLUMN_NAME, INDEX_COLUMN_NAME_COLUMN),
452        (COLUMN_COLLATION_COLUMN, COLUMN_COLLATION_COLUMN),
453        (INDEX_CARDINALITY_COLUMN, INDEX_CARDINALITY_COLUMN),
454        (INDEX_SUB_PART_COLUMN, INDEX_SUB_PART_COLUMN),
455        (INDEX_PACKED_COLUMN, INDEX_PACKED_COLUMN),
456        (COLUMN_NULLABLE_COLUMN, COLUMN_NULLABLE_COLUMN),
457        (statistics::INDEX_TYPE, INDEX_INDEX_TYPE_COLUMN),
458        (COLUMN_COMMENT_COLUMN, COLUMN_COMMENT_COLUMN),
459        (INDEX_COMMENT_COLUMN, INDEX_COMMENT_COLUMN),
460        (INDEX_VISIBLE_COLUMN, INDEX_VISIBLE_COLUMN),
461        (INDEX_EXPRESSION_COLUMN, INDEX_EXPRESSION_COLUMN),
462    ];
463
464    let filters = vec![
465        col(statistics::TABLE_NAME).eq(lit(&stmt.table)),
466        col(statistics::TABLE_SCHEMA).eq(lit(schema_name.clone())),
467    ];
468    let like_field = None;
469    let sort = vec![
470        col(statistics::INDEX_NAME).sort(true, true),
471        col(statistics::SEQ_IN_INDEX).sort(true, true),
472    ];
473
474    query_from_information_schema_table(
475        query_engine,
476        catalog_manager,
477        query_ctx,
478        STATISTICS,
479        select,
480        projects,
481        filters,
482        like_field,
483        sort,
484        stmt.kind,
485    )
486    .await
487}
488
489/// Execute `SHOW REGION` statement.
490pub async fn show_region(
491    stmt: ShowRegion,
492    query_engine: &QueryEngineRef,
493    catalog_manager: &CatalogManagerRef,
494    query_ctx: QueryContextRef,
495) -> Result<Output> {
496    let schema_name = if let Some(database) = stmt.database {
497        database
498    } else {
499        query_ctx.current_schema()
500    };
501
502    let filters = vec![
503        col(region_peers::TABLE_NAME).eq(lit(&stmt.table)),
504        col(region_peers::TABLE_SCHEMA).eq(lit(schema_name.clone())),
505        col(region_peers::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
506    ];
507    let projects = vec![
508        (region_peers::TABLE_NAME, "Table"),
509        (region_peers::REGION_ID, "Region"),
510        (region_peers::PEER_ID, "Peer"),
511        (region_peers::IS_LEADER, "Leader"),
512    ];
513
514    let like_field = None;
515    let sort = vec![
516        col(columns::REGION_ID).sort(true, true),
517        col(columns::PEER_ID).sort(true, true),
518    ];
519
520    query_from_information_schema_table(
521        query_engine,
522        catalog_manager,
523        query_ctx,
524        REGION_PEERS,
525        vec![],
526        projects,
527        filters,
528        like_field,
529        sort,
530        stmt.kind,
531    )
532    .await
533}
534
535/// Execute [`ShowTables`] statement and return the [`Output`] if success.
536pub async fn show_tables(
537    stmt: ShowTables,
538    query_engine: &QueryEngineRef,
539    catalog_manager: &CatalogManagerRef,
540    query_ctx: QueryContextRef,
541) -> Result<Output> {
542    let schema_name = if let Some(database) = stmt.database {
543        database
544    } else {
545        query_ctx.current_schema()
546    };
547
548    // MySQL renames `table_name` to `Tables_in_{schema}` for protocol compatibility
549    let tables_column = format!("Tables_in_{}", schema_name);
550    let projects = if stmt.full {
551        vec![
552            (tables::TABLE_NAME, tables_column.as_str()),
553            (tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
554        ]
555    } else {
556        vec![(tables::TABLE_NAME, tables_column.as_str())]
557    };
558    let filters = vec![
559        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
560        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
561    ];
562    let like_field = Some(tables::TABLE_NAME);
563    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
564
565    // Transform the WHERE clause for backward compatibility:
566    // Replace "Tables" with "Tables_in_{schema}" to support old queries
567    let kind = match stmt.kind {
568        ShowKind::Where(mut filter) => {
569            replace_column_in_expr(&mut filter, "Tables", &tables_column);
570            ShowKind::Where(filter)
571        }
572        other => other,
573    };
574
575    query_from_information_schema_table(
576        query_engine,
577        catalog_manager,
578        query_ctx,
579        TABLES,
580        vec![],
581        projects,
582        filters,
583        like_field,
584        sort,
585        kind,
586    )
587    .await
588}
589
590/// Execute [`ShowTableStatus`] statement and return the [`Output`] if success.
591pub async fn show_table_status(
592    stmt: ShowTableStatus,
593    query_engine: &QueryEngineRef,
594    catalog_manager: &CatalogManagerRef,
595    query_ctx: QueryContextRef,
596) -> Result<Output> {
597    let schema_name = if let Some(database) = stmt.database {
598        database
599    } else {
600        query_ctx.current_schema()
601    };
602
603    // Refer to https://dev.mysql.com/doc/refman/8.4/en/show-table-status.html
604    let projects = vec![
605        (tables::TABLE_NAME, "Name"),
606        (tables::ENGINE, "Engine"),
607        (tables::VERSION, "Version"),
608        (tables::ROW_FORMAT, "Row_format"),
609        (tables::TABLE_ROWS, "Rows"),
610        (tables::AVG_ROW_LENGTH, "Avg_row_length"),
611        (tables::DATA_LENGTH, "Data_length"),
612        (tables::MAX_DATA_LENGTH, "Max_data_length"),
613        (tables::INDEX_LENGTH, "Index_length"),
614        (tables::DATA_FREE, "Data_free"),
615        (tables::AUTO_INCREMENT, "Auto_increment"),
616        (tables::CREATE_TIME, "Create_time"),
617        (tables::UPDATE_TIME, "Update_time"),
618        (tables::CHECK_TIME, "Check_time"),
619        (tables::TABLE_COLLATION, "Collation"),
620        (tables::CHECKSUM, "Checksum"),
621        (tables::CREATE_OPTIONS, "Create_options"),
622        (tables::TABLE_COMMENT, "Comment"),
623    ];
624
625    let filters = vec![
626        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
627        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
628    ];
629    let like_field = Some(tables::TABLE_NAME);
630    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
631
632    query_from_information_schema_table(
633        query_engine,
634        catalog_manager,
635        query_ctx,
636        TABLES,
637        vec![],
638        projects,
639        filters,
640        like_field,
641        sort,
642        stmt.kind,
643    )
644    .await
645}
646
647/// Execute `SHOW COLLATION` statement and returns the `Output` if success.
648pub async fn show_collations(
649    kind: ShowKind,
650    query_engine: &QueryEngineRef,
651    catalog_manager: &CatalogManagerRef,
652    query_ctx: QueryContextRef,
653) -> Result<Output> {
654    // Refer to https://dev.mysql.com/doc/refman/8.0/en/show-collation.html
655    let projects = vec![
656        ("collation_name", "Collation"),
657        ("character_set_name", "Charset"),
658        ("id", "Id"),
659        ("is_default", "Default"),
660        ("is_compiled", "Compiled"),
661        ("sortlen", "Sortlen"),
662    ];
663
664    let filters = vec![];
665    let like_field = Some("collation_name");
666    let sort = vec![];
667
668    query_from_information_schema_table(
669        query_engine,
670        catalog_manager,
671        query_ctx,
672        COLLATIONS,
673        vec![],
674        projects,
675        filters,
676        like_field,
677        sort,
678        kind,
679    )
680    .await
681}
682
683/// Execute `SHOW CHARSET` statement and returns the `Output` if success.
684pub async fn show_charsets(
685    kind: ShowKind,
686    query_engine: &QueryEngineRef,
687    catalog_manager: &CatalogManagerRef,
688    query_ctx: QueryContextRef,
689) -> Result<Output> {
690    // Refer to https://dev.mysql.com/doc/refman/8.0/en/show-character-set.html
691    let projects = vec![
692        ("character_set_name", "Charset"),
693        ("description", "Description"),
694        ("default_collate_name", "Default collation"),
695        ("maxlen", "Maxlen"),
696    ];
697
698    let filters = vec![];
699    let like_field = Some("character_set_name");
700    let sort = vec![];
701
702    query_from_information_schema_table(
703        query_engine,
704        catalog_manager,
705        query_ctx,
706        CHARACTER_SETS,
707        vec![],
708        projects,
709        filters,
710        like_field,
711        sort,
712        kind,
713    )
714    .await
715}
716
717pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
718    let variable = stmt.variable.to_string().to_uppercase();
719    let value = match variable.as_str() {
720        "SYSTEM_TIME_ZONE" | "SYSTEM_TIMEZONE" => get_timezone(None).to_string(),
721        "TIME_ZONE" | "TIMEZONE" => query_ctx.timezone().to_string(),
722        "READ_PREFERENCE" => query_ctx.read_preference().to_string(),
723        "DATESTYLE" => {
724            let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
725            format!("{}, {}", style, order)
726        }
727        "INTERVALSTYLE" => {
728            let style = *query_ctx
729                .configuration_parameter()
730                .pg_intervalstyle_format();
731            style.to_string()
732        }
733        "MAX_EXECUTION_TIME"
734            if query_ctx.channel() == Channel::Mysql => {
735                query_ctx.query_timeout_as_millis().to_string()
736            }
737        "STATEMENT_TIMEOUT"
738            // Add time units to postgres query timeout display.
739            if query_ctx.channel() == Channel::Postgres => {
740                let mut timeout = query_ctx.query_timeout_as_millis().to_string();
741                timeout.push_str("ms");
742                timeout
743            }
744        _ => return UnsupportedVariableSnafu { name: variable }.fail(),
745    };
746    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
747        variable,
748        ConcreteDataType::string_datatype(),
749        false,
750    )]));
751    let records = RecordBatches::try_from_columns(
752        schema,
753        vec![Arc::new(StringVector::from(vec![value])) as _],
754    )
755    .context(error::CreateRecordBatchSnafu)?;
756    Ok(Output::new_with_record_batches(records))
757}
758
759pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
760    let schema = Arc::new(Schema::new(vec![
761        ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), false),
762        ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
763    ]));
764    let records = RecordBatches::try_from_columns(
765        schema,
766        vec![
767            Arc::new(StringVector::from(Vec::<&str>::new())) as _,
768            Arc::new(StringVector::from(Vec::<&str>::new())) as _,
769        ],
770    )
771    .context(error::CreateRecordBatchSnafu)?;
772    Ok(Output::new_with_record_batches(records))
773}
774
775pub async fn show_search_path(_query_ctx: QueryContextRef) -> Result<Output> {
776    let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
777        "search_path",
778        ConcreteDataType::string_datatype(),
779        false,
780    )]));
781    let records = RecordBatches::try_from_columns(
782        schema,
783        vec![Arc::new(StringVector::from(vec![_query_ctx.current_schema()])) as _],
784    )
785    .context(error::CreateRecordBatchSnafu)?;
786    Ok(Output::new_with_record_batches(records))
787}
788
789pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
790    let stmt = CreateDatabase {
791        name: ObjectName::from(vec![Ident::new(database_name)]),
792        if_not_exists: true,
793        options,
794    };
795    let sql = format!("{stmt}");
796    let columns = vec![
797        Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
798        Arc::new(StringVector::from(vec![sql])) as _,
799    ];
800    let records =
801        RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
802            .context(error::CreateRecordBatchSnafu)?;
803    Ok(Output::new_with_record_batches(records))
804}
805
806pub fn show_create_table(
807    table_info: TableInfoRef,
808    schema_options: Option<SchemaOptions>,
809    partitions: Option<Partitions>,
810    query_ctx: QueryContextRef,
811) -> Result<Output> {
812    let table_name = table_info.name.clone();
813
814    let quote_style = query_ctx.quote_style();
815
816    let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
817    stmt.partitions = partitions.map(|mut p| {
818        p.set_quote(quote_style);
819        p
820    });
821    let sql = format!("{}", stmt);
822    let columns = vec![
823        Arc::new(StringVector::from(vec![table_name])) as _,
824        Arc::new(StringVector::from(vec![sql])) as _,
825    ];
826    let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
827        .context(error::CreateRecordBatchSnafu)?;
828
829    Ok(Output::new_with_record_batches(records))
830}
831
832pub fn show_create_foreign_table_for_pg(
833    table: TableRef,
834    _query_ctx: QueryContextRef,
835) -> Result<Output> {
836    let table_info = table.table_info();
837
838    let table_meta = &table_info.meta;
839    let table_name = &table_info.name;
840    let schema = &table_info.meta.schema;
841    let is_metric_engine = is_metric_engine(&table_meta.engine);
842
843    let columns = schema
844        .column_schemas()
845        .iter()
846        .filter_map(|c| {
847            if is_metric_engine && is_metric_engine_internal_column(&c.name) {
848                None
849            } else {
850                Some(format!(
851                    "\"{}\" {}",
852                    c.name,
853                    c.data_type.postgres_datatype_name()
854                ))
855            }
856        })
857        .join(",\n  ");
858
859    let sql = format!(
860        r#"CREATE FOREIGN TABLE ft_{} (
861  {}
862)
863SERVER greptimedb
864OPTIONS (table_name '{}')"#,
865        table_name, columns, table_name
866    );
867
868    let columns = vec![
869        Arc::new(StringVector::from(vec![table_name.clone()])) as _,
870        Arc::new(StringVector::from(vec![sql])) as _,
871    ];
872    let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
873        .context(error::CreateRecordBatchSnafu)?;
874
875    Ok(Output::new_with_record_batches(records))
876}
877
878pub fn show_create_view(
879    view_name: ObjectName,
880    definition: &str,
881    query_ctx: QueryContextRef,
882) -> Result<Output> {
883    let mut parser_ctx =
884        ParserContext::new(query_ctx.sql_dialect(), definition).context(error::SqlSnafu)?;
885
886    let Statement::CreateView(create_view) =
887        parser_ctx.parse_statement().context(error::SqlSnafu)?
888    else {
889        // MUST be `CreateView` statement.
890        unreachable!();
891    };
892
893    let stmt = CreateView {
894        name: view_name.clone(),
895        columns: create_view.columns,
896        query: create_view.query,
897        or_replace: create_view.or_replace,
898        if_not_exists: create_view.if_not_exists,
899    };
900
901    let sql = format!("{}", stmt);
902    let columns = vec![
903        Arc::new(StringVector::from(vec![view_name.to_string()])) as _,
904        Arc::new(StringVector::from(vec![sql])) as _,
905    ];
906    let records = RecordBatches::try_from_columns(SHOW_CREATE_VIEW_OUTPUT_SCHEMA.clone(), columns)
907        .context(error::CreateRecordBatchSnafu)?;
908
909    Ok(Output::new_with_record_batches(records))
910}
911
912/// Execute [`ShowViews`] statement and return the [`Output`] if success.
913pub async fn show_views(
914    stmt: ShowViews,
915    query_engine: &QueryEngineRef,
916    catalog_manager: &CatalogManagerRef,
917    query_ctx: QueryContextRef,
918) -> Result<Output> {
919    let schema_name = if let Some(database) = stmt.database {
920        database
921    } else {
922        query_ctx.current_schema()
923    };
924
925    let projects = vec![(tables::TABLE_NAME, VIEWS_COLUMN)];
926    let filters = vec![
927        col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
928        col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
929    ];
930    let like_field = Some(tables::TABLE_NAME);
931    let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
932
933    query_from_information_schema_table(
934        query_engine,
935        catalog_manager,
936        query_ctx,
937        VIEWS,
938        vec![],
939        projects,
940        filters,
941        like_field,
942        sort,
943        stmt.kind,
944    )
945    .await
946}
947
948/// Execute [`ShowFlows`] statement and return the [`Output`] if success.
949pub async fn show_flows(
950    stmt: ShowFlows,
951    query_engine: &QueryEngineRef,
952    catalog_manager: &CatalogManagerRef,
953    query_ctx: QueryContextRef,
954) -> Result<Output> {
955    let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
956    let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
957    let like_field = Some(flows::FLOW_NAME);
958    let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
959
960    query_from_information_schema_table(
961        query_engine,
962        catalog_manager,
963        query_ctx,
964        FLOWS,
965        vec![],
966        projects,
967        filters,
968        like_field,
969        sort,
970        stmt.kind,
971    )
972    .await
973}
974
975#[cfg(feature = "enterprise")]
976pub async fn show_triggers(
977    stmt: sql::statements::show::trigger::ShowTriggers,
978    query_engine: &QueryEngineRef,
979    catalog_manager: &CatalogManagerRef,
980    query_ctx: QueryContextRef,
981) -> Result<Output> {
982    const TRIGGER_NAME: &str = "trigger_name";
983    const TRIGGERS_COLUMN: &str = "Triggers";
984
985    let projects = vec![(TRIGGER_NAME, TRIGGERS_COLUMN)];
986    let like_field = Some(TRIGGER_NAME);
987    let sort = vec![col(TRIGGER_NAME).sort(true, true)];
988
989    query_from_information_schema_table(
990        query_engine,
991        catalog_manager,
992        query_ctx,
993        catalog::information_schema::TRIGGERS,
994        vec![],
995        projects,
996        vec![],
997        like_field,
998        sort,
999        stmt.kind,
1000    )
1001    .await
1002}
1003
1004pub fn show_create_flow(
1005    flow_name: ObjectName,
1006    flow_val: FlowInfoValue,
1007    query_ctx: QueryContextRef,
1008) -> Result<Output> {
1009    let mut parser_ctx =
1010        ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
1011
1012    let query = parser_ctx.parse_statement().context(error::SqlSnafu)?;
1013
1014    // since prom ql will parse `now()` to a fixed time, we need to not use it for generating raw query
1015    let raw_query = match &query {
1016        Statement::Tql(_) => flow_val.raw_sql().clone(),
1017        _ => query.to_string(),
1018    };
1019
1020    let query = Box::new(SqlOrTql::try_from_statement(query, &raw_query).context(error::SqlSnafu)?);
1021
1022    let comment = if flow_val.comment().is_empty() {
1023        None
1024    } else {
1025        Some(flow_val.comment().clone())
1026    };
1027
1028    let stmt = CreateFlow {
1029        flow_name,
1030        sink_table_name: ObjectName::from(vec![
1031            Ident::new(&flow_val.sink_table_name().schema_name),
1032            Ident::new(&flow_val.sink_table_name().table_name),
1033        ]),
1034        // notice we don't want `OR REPLACE` and `IF NOT EXISTS` in same sql since it's unclear what to do
1035        // so we set `or_replace` to false.
1036        or_replace: false,
1037        if_not_exists: true,
1038        expire_after: flow_val.expire_after(),
1039        eval_interval: flow_val.eval_interval(),
1040        comment,
1041        flow_options: OptionMap::from_filtered_string_map(
1042            flow_val.options(),
1043            &[FlowType::FLOW_TYPE_KEY],
1044        ),
1045        query,
1046    };
1047
1048    let sql = format!("{}", stmt);
1049    let columns = vec![
1050        Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
1051        Arc::new(StringVector::from(vec![sql])) as _,
1052    ];
1053    let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
1054        .context(error::CreateRecordBatchSnafu)?;
1055
1056    Ok(Output::new_with_record_batches(records))
1057}
1058
1059pub fn describe_table(table: TableRef) -> Result<Output> {
1060    let table_info = table.table_info();
1061    let columns_schemas = table_info.meta.schema.column_schemas();
1062    let columns = vec![
1063        describe_column_names(columns_schemas),
1064        describe_column_types(columns_schemas),
1065        describe_column_keys(columns_schemas, &table_info.meta.primary_key_indices),
1066        describe_column_nullables(columns_schemas),
1067        describe_column_defaults(columns_schemas),
1068        describe_column_semantic_types(columns_schemas, &table_info.meta.primary_key_indices),
1069    ];
1070    let records = RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), columns)
1071        .context(error::CreateRecordBatchSnafu)?;
1072    Ok(Output::new_with_record_batches(records))
1073}
1074
1075fn describe_column_names(columns_schemas: &[ColumnSchema]) -> VectorRef {
1076    Arc::new(StringVector::from_iterator(
1077        columns_schemas.iter().map(|cs| cs.name.as_str()),
1078    ))
1079}
1080
1081fn describe_column_types(columns_schemas: &[ColumnSchema]) -> VectorRef {
1082    Arc::new(StringVector::from(
1083        columns_schemas
1084            .iter()
1085            .map(|cs| cs.data_type.name())
1086            .collect::<Vec<_>>(),
1087    ))
1088}
1089
1090fn describe_column_keys(
1091    columns_schemas: &[ColumnSchema],
1092    primary_key_indices: &[usize],
1093) -> VectorRef {
1094    Arc::new(StringVector::from_iterator(
1095        columns_schemas.iter().enumerate().map(|(i, cs)| {
1096            if cs.is_time_index() || primary_key_indices.contains(&i) {
1097                PRI_KEY
1098            } else {
1099                ""
1100            }
1101        }),
1102    ))
1103}
1104
1105fn describe_column_nullables(columns_schemas: &[ColumnSchema]) -> VectorRef {
1106    Arc::new(StringVector::from_iterator(columns_schemas.iter().map(
1107        |cs| {
1108            if cs.is_nullable() { YES_STR } else { NO_STR }
1109        },
1110    )))
1111}
1112
1113fn describe_column_defaults(columns_schemas: &[ColumnSchema]) -> VectorRef {
1114    Arc::new(StringVector::from(
1115        columns_schemas
1116            .iter()
1117            .map(|cs| {
1118                cs.default_constraint()
1119                    .map_or(String::from(""), |dc| dc.to_string())
1120            })
1121            .collect::<Vec<String>>(),
1122    ))
1123}
1124
1125fn describe_column_semantic_types(
1126    columns_schemas: &[ColumnSchema],
1127    primary_key_indices: &[usize],
1128) -> VectorRef {
1129    Arc::new(StringVector::from_iterator(
1130        columns_schemas.iter().enumerate().map(|(i, cs)| {
1131            if primary_key_indices.contains(&i) {
1132                SEMANTIC_TYPE_PRIMARY_KEY
1133            } else if cs.is_time_index() {
1134                SEMANTIC_TYPE_TIME_INDEX
1135            } else {
1136                SEMANTIC_TYPE_FIELD
1137            }
1138        }),
1139    ))
1140}
1141
1142// lists files in the frontend to reduce unnecessary scan requests repeated in each datanode.
1143pub async fn prepare_file_table_files(
1144    options: &HashMap<String, String>,
1145) -> Result<(ObjectStore, Vec<String>)> {
1146    let url = options
1147        .get(FILE_TABLE_LOCATION_KEY)
1148        .context(error::MissingRequiredFieldSnafu {
1149            name: FILE_TABLE_LOCATION_KEY,
1150        })?;
1151
1152    let (dir, filename) = find_dir_and_filename(url);
1153    let source = if let Some(filename) = filename {
1154        Source::Filename(filename)
1155    } else {
1156        Source::Dir
1157    };
1158    let regex = options
1159        .get(FILE_TABLE_PATTERN_KEY)
1160        .map(|x| Regex::new(x))
1161        .transpose()
1162        .context(error::BuildRegexSnafu)?;
1163    let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
1164    let lister = Lister::new(object_store.clone(), source, dir, regex);
1165    // If we scan files in a directory every time the database restarts,
1166    // then it might lead to a potential undefined behavior:
1167    // If a user adds a file with an incompatible schema to that directory,
1168    // it will make the external table unavailable.
1169    let files = lister
1170        .list()
1171        .await
1172        .context(error::ListObjectsSnafu)?
1173        .into_iter()
1174        .filter_map(|entry| {
1175            if entry.path().ends_with('/') {
1176                None
1177            } else {
1178                Some(entry.path().to_string())
1179            }
1180        })
1181        .collect::<Vec<_>>();
1182    Ok((object_store, files))
1183}
1184
1185pub async fn infer_file_table_schema(
1186    object_store: &ObjectStore,
1187    files: &[String],
1188    options: &HashMap<String, String>,
1189) -> Result<Schema> {
1190    let format = parse_file_table_format(options)?;
1191    let merged = infer_schemas(object_store, files, format.as_ref())
1192        .await
1193        .context(error::InferSchemaSnafu)?;
1194    Schema::try_from(merged).context(error::ConvertSchemaSnafu)
1195}
1196
1197// Converts the file column schemas to table column schemas.
1198// Returns the column schemas and the time index column name.
1199//
1200// More specifically, this function will do the following:
1201// 1. Add a default time index column if there is no time index column
1202//    in the file column schemas, or
1203// 2. If the file column schemas contain a column with name conflicts with
1204//    the default time index column, it will replace the column schema
1205//    with the default one.
1206pub fn file_column_schemas_to_table(
1207    file_column_schemas: &[ColumnSchema],
1208) -> (Vec<ColumnSchema>, String) {
1209    let mut column_schemas = file_column_schemas.to_owned();
1210    if let Some(time_index_column) = column_schemas.iter().find(|c| c.is_time_index()) {
1211        let time_index = time_index_column.name.clone();
1212        return (column_schemas, time_index);
1213    }
1214
1215    let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
1216    let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
1217    let timestamp_column_schema = ColumnSchema::new(greptime_timestamp(), timestamp_type, false)
1218        .with_time_index(true)
1219        .with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
1220        .unwrap();
1221
1222    if let Some(column_schema) = column_schemas
1223        .iter_mut()
1224        .find(|column_schema| column_schema.name == greptime_timestamp())
1225    {
1226        // Replace the column schema with the default one
1227        *column_schema = timestamp_column_schema;
1228    } else {
1229        column_schemas.push(timestamp_column_schema);
1230    }
1231
1232    (column_schemas, greptime_timestamp().to_string())
1233}
1234
1235/// This function checks if the column schemas from a file can be matched with
1236/// the column schemas of a table.
1237///
1238/// More specifically, for each column seen in the table schema,
1239/// - If the same column does exist in the file schema, it checks if the data
1240///   type of the file column can be casted into the form of the table column.
1241/// - If the same column does not exist in the file schema, it checks if the
1242///   table column is nullable or has a default constraint.
1243pub fn check_file_to_table_schema_compatibility(
1244    file_column_schemas: &[ColumnSchema],
1245    table_column_schemas: &[ColumnSchema],
1246) -> Result<()> {
1247    let file_schemas_map = file_column_schemas
1248        .iter()
1249        .map(|s| (s.name.clone(), s))
1250        .collect::<HashMap<_, _>>();
1251
1252    for table_column in table_column_schemas {
1253        if let Some(file_column) = file_schemas_map.get(&table_column.name) {
1254            // TODO(zhongzc): a temporary solution, we should use `can_cast_to` once it's ready.
1255            ensure!(
1256                file_column
1257                    .data_type
1258                    .can_arrow_type_cast_to(&table_column.data_type),
1259                error::ColumnSchemaIncompatibleSnafu {
1260                    column: table_column.name.clone(),
1261                    file_type: file_column.data_type.clone(),
1262                    table_type: table_column.data_type.clone(),
1263                }
1264            );
1265        } else {
1266            ensure!(
1267                table_column.is_nullable() || table_column.default_constraint().is_some(),
1268                error::ColumnSchemaNoDefaultSnafu {
1269                    column: table_column.name.clone(),
1270                }
1271            );
1272        }
1273    }
1274
1275    Ok(())
1276}
1277
1278fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn FileFormat>> {
1279    Ok(
1280        match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
1281            Format::Csv(format) => Box::new(format),
1282            Format::Json(format) => Box::new(format),
1283            Format::Parquet(format) => Box::new(format),
1284            Format::Orc(format) => Box::new(format),
1285        },
1286    )
1287}
1288
1289pub async fn show_processlist(
1290    stmt: ShowProcessList,
1291    query_engine: &QueryEngineRef,
1292    catalog_manager: &CatalogManagerRef,
1293    query_ctx: QueryContextRef,
1294) -> Result<Output> {
1295    let projects = if stmt.full {
1296        vec![
1297            (process_list::ID, "Id"),
1298            (process_list::CATALOG, "Catalog"),
1299            (process_list::SCHEMAS, "Schema"),
1300            (process_list::CLIENT, "Client"),
1301            (process_list::FRONTEND, "Frontend"),
1302            (process_list::START_TIMESTAMP, "Start Time"),
1303            (process_list::ELAPSED_TIME, "Elapsed Time"),
1304            (process_list::QUERY, "Query"),
1305        ]
1306    } else {
1307        vec![
1308            (process_list::ID, "Id"),
1309            (process_list::CATALOG, "Catalog"),
1310            (process_list::QUERY, "Query"),
1311            (process_list::ELAPSED_TIME, "Elapsed Time"),
1312        ]
1313    };
1314
1315    let filters = vec![];
1316    let like_field = None;
1317    let sort = vec![col("id").sort(true, true)];
1318    query_from_information_schema_table(
1319        query_engine,
1320        catalog_manager,
1321        query_ctx.clone(),
1322        "process_list",
1323        vec![],
1324        projects.clone(),
1325        filters,
1326        like_field,
1327        sort,
1328        ShowKind::All,
1329    )
1330    .await
1331}
1332
1333#[cfg(test)]
1334mod test {
1335    use std::sync::Arc;
1336
1337    use common_query::{Output, OutputData};
1338    use common_recordbatch::{RecordBatch, RecordBatches};
1339    use common_time::Timezone;
1340    use common_time::timestamp::TimeUnit;
1341    use datatypes::prelude::ConcreteDataType;
1342    use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
1343    use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
1344    use session::context::QueryContextBuilder;
1345    use snafu::ResultExt;
1346    use sql::ast::{Ident, ObjectName};
1347    use sql::statements::show::ShowVariables;
1348    use table::TableRef;
1349    use table::test_util::MemTable;
1350
1351    use super::show_variable;
1352    use crate::error;
1353    use crate::error::Result;
1354    use crate::sql::{
1355        DESCRIBE_TABLE_OUTPUT_SCHEMA, NO_STR, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_TIME_INDEX,
1356        YES_STR, describe_table,
1357    };
1358
1359    #[test]
1360    fn test_describe_table_multiple_columns() -> Result<()> {
1361        let table_name = "test_table";
1362        let schema = vec![
1363            ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
1364            ColumnSchema::new(
1365                "t2",
1366                ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
1367                false,
1368            )
1369            .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
1370                "current_timestamp()",
1371            ))))
1372            .unwrap()
1373            .with_time_index(true),
1374        ];
1375        let data = vec![
1376            Arc::new(UInt32Vector::from_slice([0])) as _,
1377            Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
1378        ];
1379        let expected_columns = vec![
1380            Arc::new(StringVector::from(vec!["t1", "t2"])) as _,
1381            Arc::new(StringVector::from(vec!["UInt32", "TimestampMillisecond"])) as _,
1382            Arc::new(StringVector::from(vec!["", "PRI"])) as _,
1383            Arc::new(StringVector::from(vec![YES_STR, NO_STR])) as _,
1384            Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
1385            Arc::new(StringVector::from(vec![
1386                SEMANTIC_TYPE_FIELD,
1387                SEMANTIC_TYPE_TIME_INDEX,
1388            ])) as _,
1389        ];
1390
1391        describe_table_test_by_schema(table_name, schema, data, expected_columns)
1392    }
1393
1394    fn describe_table_test_by_schema(
1395        table_name: &str,
1396        schema: Vec<ColumnSchema>,
1397        data: Vec<VectorRef>,
1398        expected_columns: Vec<VectorRef>,
1399    ) -> Result<()> {
1400        let table_schema = SchemaRef::new(Schema::new(schema));
1401        let table = prepare_describe_table(table_name, table_schema, data);
1402
1403        let expected =
1404            RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
1405                .context(error::CreateRecordBatchSnafu)?;
1406
1407        if let OutputData::RecordBatches(res) = describe_table(table)?.data {
1408            assert_eq!(res.take(), expected.take());
1409        } else {
1410            panic!("describe table must return record batch");
1411        }
1412
1413        Ok(())
1414    }
1415
1416    fn prepare_describe_table(
1417        table_name: &str,
1418        table_schema: SchemaRef,
1419        data: Vec<VectorRef>,
1420    ) -> TableRef {
1421        let record_batch = RecordBatch::new(table_schema, data).unwrap();
1422        MemTable::table(table_name, record_batch)
1423    }
1424
1425    #[test]
1426    fn test_show_variable() {
1427        assert_eq!(
1428            exec_show_variable("SYSTEM_TIME_ZONE", "Asia/Shanghai").unwrap(),
1429            "UTC"
1430        );
1431        assert_eq!(
1432            exec_show_variable("SYSTEM_TIMEZONE", "Asia/Shanghai").unwrap(),
1433            "UTC"
1434        );
1435        assert_eq!(
1436            exec_show_variable("TIME_ZONE", "Asia/Shanghai").unwrap(),
1437            "Asia/Shanghai"
1438        );
1439        assert_eq!(
1440            exec_show_variable("TIMEZONE", "Asia/Shanghai").unwrap(),
1441            "Asia/Shanghai"
1442        );
1443        assert!(exec_show_variable("TIME ZONE", "Asia/Shanghai").is_err());
1444        assert!(exec_show_variable("SYSTEM TIME ZONE", "Asia/Shanghai").is_err());
1445    }
1446
1447    fn exec_show_variable(variable: &str, tz: &str) -> Result<String> {
1448        let stmt = ShowVariables {
1449            variable: ObjectName::from(vec![Ident::new(variable)]),
1450        };
1451        let ctx = Arc::new(
1452            QueryContextBuilder::default()
1453                .timezone(Timezone::from_tz_string(tz).unwrap())
1454                .build(),
1455        );
1456        match show_variable(stmt, ctx) {
1457            Ok(Output {
1458                data: OutputData::RecordBatches(record),
1459                ..
1460            }) => {
1461                let record = record.take().first().cloned().unwrap();
1462                Ok(record.iter_column_as_string(0).next().unwrap().unwrap())
1463            }
1464            Ok(_) => unreachable!(),
1465            Err(e) => Err(e),
1466        }
1467    }
1468}