1mod show_create_table;
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use catalog::information_schema::{
21 columns, flows, key_column_usage, region_peers, schemata, tables, CHARACTER_SETS, COLLATIONS,
22 COLUMNS, FLOWS, KEY_COLUMN_USAGE, REGION_PEERS, SCHEMATA, TABLES, VIEWS,
23};
24use catalog::CatalogManagerRef;
25use common_catalog::consts::{
26 INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
27 SEMANTIC_TYPE_TIME_INDEX,
28};
29use common_catalog::format_full_table_name;
30use common_datasource::file_format::{infer_schemas, FileFormat, Format};
31use common_datasource::lister::{Lister, Source};
32use common_datasource::object_store::build_backend;
33use common_datasource::util::find_dir_and_filename;
34use common_meta::key::flow::flow_info::FlowInfoValue;
35use common_meta::SchemaOptions;
36use common_query::prelude::GREPTIME_TIMESTAMP;
37use common_query::Output;
38use common_recordbatch::adapter::RecordBatchStreamAdapter;
39use common_recordbatch::RecordBatches;
40use common_time::timezone::get_timezone;
41use common_time::Timestamp;
42use datafusion::common::ScalarValue;
43use datafusion::prelude::SessionContext;
44use datafusion_expr::expr::WildcardOptions;
45use datafusion_expr::{case, col, lit, Expr, SortExpr};
46use datatypes::prelude::*;
47use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
48use datatypes::vectors::StringVector;
49use itertools::Itertools;
50use object_store::ObjectStore;
51use once_cell::sync::Lazy;
52use regex::Regex;
53use session::context::{Channel, QueryContextRef};
54pub use show_create_table::create_table_stmt;
55use snafu::{ensure, OptionExt, ResultExt};
56use sql::ast::Ident;
57use sql::parser::ParserContext;
58use sql::statements::create::{CreateDatabase, CreateFlow, CreateView, Partitions};
59use sql::statements::show::{
60 ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowRegion, ShowTableStatus,
61 ShowTables, ShowVariables, ShowViews,
62};
63use sql::statements::statement::Statement;
64use sql::statements::OptionMap;
65use sqlparser::ast::ObjectName;
66use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
67use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
68use table::TableRef;
69
70use crate::dataframe::DataFrame;
71use crate::error::{self, Result, UnsupportedVariableSnafu};
72use crate::planner::DfLogicalPlanner;
73use crate::QueryEngineRef;
74
75const SCHEMAS_COLUMN: &str = "Database";
76const OPTIONS_COLUMN: &str = "Options";
77const TABLES_COLUMN: &str = "Tables";
78const VIEWS_COLUMN: &str = "Views";
79const FLOWS_COLUMN: &str = "Flows";
80const FIELD_COLUMN: &str = "Field";
81const TABLE_TYPE_COLUMN: &str = "Table_type";
82const COLUMN_NAME_COLUMN: &str = "Column";
83const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
84const COLUMN_TYPE_COLUMN: &str = "Type";
85const COLUMN_KEY_COLUMN: &str = "Key";
86const COLUMN_EXTRA_COLUMN: &str = "Extra";
87const COLUMN_PRIVILEGES_COLUMN: &str = "Privileges";
88const COLUMN_COLLATION_COLUMN: &str = "Collation";
89const COLUMN_NULLABLE_COLUMN: &str = "Null";
90const COLUMN_DEFAULT_COLUMN: &str = "Default";
91const COLUMN_COMMENT_COLUMN: &str = "Comment";
92const COLUMN_SEMANTIC_TYPE_COLUMN: &str = "Semantic Type";
93
94const YES_STR: &str = "YES";
95const NO_STR: &str = "NO";
96const PRI_KEY: &str = "PRI";
97const TIME_INDEX: &str = "TIME INDEX";
98
99const INDEX_TABLE_COLUMN: &str = "Table";
101const INDEX_NONT_UNIQUE_COLUMN: &str = "Non_unique";
102const INDEX_CARDINALITY_COLUMN: &str = "Cardinality";
103const INDEX_SUB_PART_COLUMN: &str = "Sub_part";
104const INDEX_PACKED_COLUMN: &str = "Packed";
105const INDEX_INDEX_TYPE_COLUMN: &str = "Index_type";
106const INDEX_COMMENT_COLUMN: &str = "Index_comment";
107const INDEX_VISIBLE_COLUMN: &str = "Visible";
108const INDEX_EXPRESSION_COLUMN: &str = "Expression";
109const INDEX_KEY_NAME_COLUMN: &str = "Key_name";
110const INDEX_SEQ_IN_INDEX_COLUMN: &str = "Seq_in_index";
111const INDEX_COLUMN_NAME_COLUMN: &str = "Column_name";
112
113static DESCRIBE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
114 Arc::new(Schema::new(vec![
115 ColumnSchema::new(
116 COLUMN_NAME_COLUMN,
117 ConcreteDataType::string_datatype(),
118 false,
119 ),
120 ColumnSchema::new(
121 COLUMN_TYPE_COLUMN,
122 ConcreteDataType::string_datatype(),
123 false,
124 ),
125 ColumnSchema::new(COLUMN_KEY_COLUMN, ConcreteDataType::string_datatype(), true),
126 ColumnSchema::new(
127 COLUMN_NULLABLE_COLUMN,
128 ConcreteDataType::string_datatype(),
129 false,
130 ),
131 ColumnSchema::new(
132 COLUMN_DEFAULT_COLUMN,
133 ConcreteDataType::string_datatype(),
134 false,
135 ),
136 ColumnSchema::new(
137 COLUMN_SEMANTIC_TYPE_COLUMN,
138 ConcreteDataType::string_datatype(),
139 false,
140 ),
141 ]))
142});
143
144static SHOW_CREATE_DATABASE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
145 Arc::new(Schema::new(vec![
146 ColumnSchema::new("Database", ConcreteDataType::string_datatype(), false),
147 ColumnSchema::new(
148 "Create Database",
149 ConcreteDataType::string_datatype(),
150 false,
151 ),
152 ]))
153});
154
155static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
156 Arc::new(Schema::new(vec![
157 ColumnSchema::new("Table", ConcreteDataType::string_datatype(), false),
158 ColumnSchema::new("Create Table", ConcreteDataType::string_datatype(), false),
159 ]))
160});
161
162static SHOW_CREATE_FLOW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
163 Arc::new(Schema::new(vec![
164 ColumnSchema::new("Flow", ConcreteDataType::string_datatype(), false),
165 ColumnSchema::new("Create Flow", ConcreteDataType::string_datatype(), false),
166 ]))
167});
168
169static SHOW_CREATE_VIEW_OUTPUT_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
170 Arc::new(Schema::new(vec![
171 ColumnSchema::new("View", ConcreteDataType::string_datatype(), false),
172 ColumnSchema::new("Create View", ConcreteDataType::string_datatype(), false),
173 ]))
174});
175
176fn null() -> Expr {
177 lit(ScalarValue::Null)
178}
179
180pub async fn show_databases(
181 stmt: ShowDatabases,
182 query_engine: &QueryEngineRef,
183 catalog_manager: &CatalogManagerRef,
184 query_ctx: QueryContextRef,
185) -> Result<Output> {
186 let projects = if stmt.full {
187 vec![
188 (schemata::SCHEMA_NAME, SCHEMAS_COLUMN),
189 (schemata::SCHEMA_OPTS, OPTIONS_COLUMN),
190 ]
191 } else {
192 vec![(schemata::SCHEMA_NAME, SCHEMAS_COLUMN)]
193 };
194
195 let filters = vec![col(schemata::CATALOG_NAME).eq(lit(query_ctx.current_catalog()))];
196 let like_field = Some(schemata::SCHEMA_NAME);
197 let sort = vec![col(schemata::SCHEMA_NAME).sort(true, true)];
198
199 query_from_information_schema_table(
200 query_engine,
201 catalog_manager,
202 query_ctx,
203 SCHEMATA,
204 vec![],
205 projects,
206 filters,
207 like_field,
208 sort,
209 stmt.kind,
210 )
211 .await
212}
213
214#[allow(clippy::too_many_arguments)]
222async fn query_from_information_schema_table(
223 query_engine: &QueryEngineRef,
224 catalog_manager: &CatalogManagerRef,
225 query_ctx: QueryContextRef,
226 table_name: &str,
227 select: Vec<Expr>,
228 projects: Vec<(&str, &str)>,
229 filters: Vec<Expr>,
230 like_field: Option<&str>,
231 sort: Vec<SortExpr>,
232 kind: ShowKind,
233) -> Result<Output> {
234 let table = catalog_manager
235 .table(
236 query_ctx.current_catalog(),
237 INFORMATION_SCHEMA_NAME,
238 table_name,
239 Some(&query_ctx),
240 )
241 .await
242 .context(error::CatalogSnafu)?
243 .with_context(|| error::TableNotFoundSnafu {
244 table: format_full_table_name(
245 query_ctx.current_catalog(),
246 INFORMATION_SCHEMA_NAME,
247 table_name,
248 ),
249 })?;
250
251 let DataFrame::DataFusion(dataframe) = query_engine.read_table(table)?;
252
253 let dataframe = if select.is_empty() {
255 dataframe
256 } else {
257 dataframe.select(select).context(error::PlanSqlSnafu)?
258 };
259
260 let dataframe = filters.into_iter().try_fold(dataframe, |df, expr| {
262 df.filter(expr).context(error::PlanSqlSnafu)
263 })?;
264
265 let dataframe = if let (ShowKind::Like(ident), Some(field)) = (&kind, like_field) {
267 dataframe
268 .filter(col(field).like(lit(ident.value.clone())))
269 .context(error::PlanSqlSnafu)?
270 } else {
271 dataframe
272 };
273
274 let dataframe = dataframe
276 .sort(sort)
277 .context(error::PlanSqlSnafu)?
278 .select_columns(&projects.iter().map(|(c, _)| *c).collect::<Vec<_>>())
279 .context(error::PlanSqlSnafu)?;
280
281 let dataframe = projects
283 .into_iter()
284 .try_fold(dataframe, |df, (column, renamed_column)| {
285 df.with_column_renamed(column, renamed_column)
286 .context(error::PlanSqlSnafu)
287 })?;
288
289 let dataframe = match kind {
290 ShowKind::All | ShowKind::Like(_) => {
291 dataframe
293 }
294 ShowKind::Where(filter) => {
295 let view = dataframe.into_view();
298 let dataframe = SessionContext::new_with_state(
299 query_engine
300 .engine_context(query_ctx.clone())
301 .state()
302 .clone(),
303 )
304 .read_table(view)?;
305
306 let planner = query_engine.planner();
307 let planner = planner
308 .as_any()
309 .downcast_ref::<DfLogicalPlanner>()
310 .expect("Must be the datafusion planner");
311
312 let filter = planner
313 .sql_to_expr(filter, dataframe.schema(), false, query_ctx)
314 .await?;
315
316 dataframe.filter(filter).context(error::PlanSqlSnafu)?
318 }
319 };
320
321 let stream = dataframe.execute_stream().await?;
322
323 Ok(Output::new_with_stream(Box::pin(
324 RecordBatchStreamAdapter::try_new(stream).context(error::CreateRecordBatchSnafu)?,
325 )))
326}
327
328pub async fn show_columns(
330 stmt: ShowColumns,
331 query_engine: &QueryEngineRef,
332 catalog_manager: &CatalogManagerRef,
333 query_ctx: QueryContextRef,
334) -> Result<Output> {
335 let schema_name = if let Some(database) = stmt.database {
336 database
337 } else {
338 query_ctx.current_schema()
339 };
340
341 let projects = if stmt.full {
342 vec![
343 (columns::COLUMN_NAME, FIELD_COLUMN),
344 (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
345 (columns::COLLATION_NAME, COLUMN_COLLATION_COLUMN),
346 (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
347 (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
348 (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
349 (columns::COLUMN_COMMENT, COLUMN_COMMENT_COLUMN),
350 (columns::PRIVILEGES, COLUMN_PRIVILEGES_COLUMN),
351 (columns::EXTRA, COLUMN_EXTRA_COLUMN),
352 (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
353 ]
354 } else {
355 vec![
356 (columns::COLUMN_NAME, FIELD_COLUMN),
357 (columns::DATA_TYPE, COLUMN_TYPE_COLUMN),
358 (columns::IS_NULLABLE, COLUMN_NULLABLE_COLUMN),
359 (columns::COLUMN_KEY, COLUMN_KEY_COLUMN),
360 (columns::COLUMN_DEFAULT, COLUMN_DEFAULT_COLUMN),
361 (columns::EXTRA, COLUMN_EXTRA_COLUMN),
362 (columns::GREPTIME_DATA_TYPE, COLUMN_GREPTIME_TYPE_COLUMN),
363 ]
364 };
365
366 let filters = vec![
367 col(columns::TABLE_NAME).eq(lit(&stmt.table)),
368 col(columns::TABLE_SCHEMA).eq(lit(schema_name.clone())),
369 col(columns::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
370 ];
371 let like_field = Some(columns::COLUMN_NAME);
372 let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
373
374 query_from_information_schema_table(
375 query_engine,
376 catalog_manager,
377 query_ctx,
378 COLUMNS,
379 vec![],
380 projects,
381 filters,
382 like_field,
383 sort,
384 stmt.kind,
385 )
386 .await
387}
388
389pub async fn show_index(
391 stmt: ShowIndex,
392 query_engine: &QueryEngineRef,
393 catalog_manager: &CatalogManagerRef,
394 query_ctx: QueryContextRef,
395) -> Result<Output> {
396 let schema_name = if let Some(database) = stmt.database {
397 database
398 } else {
399 query_ctx.current_schema()
400 };
401
402 let select = vec![
403 lit(1).alias(INDEX_NONT_UNIQUE_COLUMN),
405 lit("A").alias(COLUMN_COLLATION_COLUMN),
407 null().alias(INDEX_CARDINALITY_COLUMN),
408 null().alias(INDEX_SUB_PART_COLUMN),
409 null().alias(INDEX_PACKED_COLUMN),
410 case(col(key_column_usage::CONSTRAINT_NAME))
415 .when(lit(TIME_INDEX), lit(NO_STR))
416 .otherwise(lit(YES_STR))
417 .context(error::PlanSqlSnafu)?
418 .alias(COLUMN_NULLABLE_COLUMN),
419 lit("").alias(COLUMN_COMMENT_COLUMN),
420 lit("").alias(INDEX_COMMENT_COLUMN),
421 lit(YES_STR).alias(INDEX_VISIBLE_COLUMN),
422 null().alias(INDEX_EXPRESSION_COLUMN),
423 Expr::Wildcard {
424 qualifier: None,
425 options: Box::new(WildcardOptions::default()),
426 },
427 ];
428
429 let projects = vec![
430 (key_column_usage::TABLE_NAME, INDEX_TABLE_COLUMN),
431 (INDEX_NONT_UNIQUE_COLUMN, INDEX_NONT_UNIQUE_COLUMN),
432 (key_column_usage::CONSTRAINT_NAME, INDEX_KEY_NAME_COLUMN),
433 (
434 key_column_usage::ORDINAL_POSITION,
435 INDEX_SEQ_IN_INDEX_COLUMN,
436 ),
437 (key_column_usage::COLUMN_NAME, INDEX_COLUMN_NAME_COLUMN),
438 (COLUMN_COLLATION_COLUMN, COLUMN_COLLATION_COLUMN),
439 (INDEX_CARDINALITY_COLUMN, INDEX_CARDINALITY_COLUMN),
440 (INDEX_SUB_PART_COLUMN, INDEX_SUB_PART_COLUMN),
441 (INDEX_PACKED_COLUMN, INDEX_PACKED_COLUMN),
442 (COLUMN_NULLABLE_COLUMN, COLUMN_NULLABLE_COLUMN),
443 (
444 key_column_usage::GREPTIME_INDEX_TYPE,
445 INDEX_INDEX_TYPE_COLUMN,
446 ),
447 (COLUMN_COMMENT_COLUMN, COLUMN_COMMENT_COLUMN),
448 (INDEX_COMMENT_COLUMN, INDEX_COMMENT_COLUMN),
449 (INDEX_VISIBLE_COLUMN, INDEX_VISIBLE_COLUMN),
450 (INDEX_EXPRESSION_COLUMN, INDEX_EXPRESSION_COLUMN),
451 ];
452
453 let filters = vec![
454 col(key_column_usage::TABLE_NAME).eq(lit(&stmt.table)),
455 col(key_column_usage::TABLE_SCHEMA).eq(lit(schema_name.clone())),
456 col(key_column_usage::REAL_TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
457 ];
458 let like_field = None;
459 let sort = vec![col(columns::COLUMN_NAME).sort(true, true)];
460
461 query_from_information_schema_table(
462 query_engine,
463 catalog_manager,
464 query_ctx,
465 KEY_COLUMN_USAGE,
466 select,
467 projects,
468 filters,
469 like_field,
470 sort,
471 stmt.kind,
472 )
473 .await
474}
475
476pub async fn show_region(
478 stmt: ShowRegion,
479 query_engine: &QueryEngineRef,
480 catalog_manager: &CatalogManagerRef,
481 query_ctx: QueryContextRef,
482) -> Result<Output> {
483 let schema_name = if let Some(database) = stmt.database {
484 database
485 } else {
486 query_ctx.current_schema()
487 };
488
489 let filters = vec![
490 col(region_peers::TABLE_NAME).eq(lit(&stmt.table)),
491 col(region_peers::TABLE_SCHEMA).eq(lit(schema_name.clone())),
492 col(region_peers::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
493 ];
494 let projects = vec![
495 (region_peers::TABLE_NAME, "Table"),
496 (region_peers::REGION_ID, "Region"),
497 (region_peers::PEER_ID, "Peer"),
498 (region_peers::IS_LEADER, "Leader"),
499 ];
500
501 let like_field = None;
502 let sort = vec![
503 col(columns::REGION_ID).sort(true, true),
504 col(columns::PEER_ID).sort(true, true),
505 ];
506
507 query_from_information_schema_table(
508 query_engine,
509 catalog_manager,
510 query_ctx,
511 REGION_PEERS,
512 vec![],
513 projects,
514 filters,
515 like_field,
516 sort,
517 stmt.kind,
518 )
519 .await
520}
521
522pub async fn show_tables(
524 stmt: ShowTables,
525 query_engine: &QueryEngineRef,
526 catalog_manager: &CatalogManagerRef,
527 query_ctx: QueryContextRef,
528) -> Result<Output> {
529 let schema_name = if let Some(database) = stmt.database {
530 database
531 } else {
532 query_ctx.current_schema()
533 };
534
535 let projects = if stmt.full {
538 vec![
539 (tables::TABLE_NAME, TABLES_COLUMN),
540 (tables::TABLE_TYPE, TABLE_TYPE_COLUMN),
541 ]
542 } else {
543 vec![(tables::TABLE_NAME, TABLES_COLUMN)]
544 };
545 let filters = vec![
546 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
547 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
548 ];
549 let like_field = Some(tables::TABLE_NAME);
550 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
551
552 query_from_information_schema_table(
553 query_engine,
554 catalog_manager,
555 query_ctx,
556 TABLES,
557 vec![],
558 projects,
559 filters,
560 like_field,
561 sort,
562 stmt.kind,
563 )
564 .await
565}
566
567pub async fn show_table_status(
569 stmt: ShowTableStatus,
570 query_engine: &QueryEngineRef,
571 catalog_manager: &CatalogManagerRef,
572 query_ctx: QueryContextRef,
573) -> Result<Output> {
574 let schema_name = if let Some(database) = stmt.database {
575 database
576 } else {
577 query_ctx.current_schema()
578 };
579
580 let projects = vec![
582 (tables::TABLE_NAME, "Name"),
583 (tables::ENGINE, "Engine"),
584 (tables::VERSION, "Version"),
585 (tables::ROW_FORMAT, "Row_format"),
586 (tables::TABLE_ROWS, "Rows"),
587 (tables::AVG_ROW_LENGTH, "Avg_row_length"),
588 (tables::DATA_LENGTH, "Data_length"),
589 (tables::MAX_DATA_LENGTH, "Max_data_length"),
590 (tables::INDEX_LENGTH, "Index_length"),
591 (tables::DATA_FREE, "Data_free"),
592 (tables::AUTO_INCREMENT, "Auto_increment"),
593 (tables::CREATE_TIME, "Create_time"),
594 (tables::UPDATE_TIME, "Update_time"),
595 (tables::CHECK_TIME, "Check_time"),
596 (tables::TABLE_COLLATION, "Collation"),
597 (tables::CHECKSUM, "Checksum"),
598 (tables::CREATE_OPTIONS, "Create_options"),
599 (tables::TABLE_COMMENT, "Comment"),
600 ];
601
602 let filters = vec![
603 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
604 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
605 ];
606 let like_field = Some(tables::TABLE_NAME);
607 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
608
609 query_from_information_schema_table(
610 query_engine,
611 catalog_manager,
612 query_ctx,
613 TABLES,
614 vec![],
615 projects,
616 filters,
617 like_field,
618 sort,
619 stmt.kind,
620 )
621 .await
622}
623
624pub async fn show_collations(
626 kind: ShowKind,
627 query_engine: &QueryEngineRef,
628 catalog_manager: &CatalogManagerRef,
629 query_ctx: QueryContextRef,
630) -> Result<Output> {
631 let projects = vec![
633 ("collation_name", "Collation"),
634 ("character_set_name", "Charset"),
635 ("id", "Id"),
636 ("is_default", "Default"),
637 ("is_compiled", "Compiled"),
638 ("sortlen", "Sortlen"),
639 ];
640
641 let filters = vec![];
642 let like_field = Some("collation_name");
643 let sort = vec![];
644
645 query_from_information_schema_table(
646 query_engine,
647 catalog_manager,
648 query_ctx,
649 COLLATIONS,
650 vec![],
651 projects,
652 filters,
653 like_field,
654 sort,
655 kind,
656 )
657 .await
658}
659
660pub async fn show_charsets(
662 kind: ShowKind,
663 query_engine: &QueryEngineRef,
664 catalog_manager: &CatalogManagerRef,
665 query_ctx: QueryContextRef,
666) -> Result<Output> {
667 let projects = vec![
669 ("character_set_name", "Charset"),
670 ("description", "Description"),
671 ("default_collate_name", "Default collation"),
672 ("maxlen", "Maxlen"),
673 ];
674
675 let filters = vec![];
676 let like_field = Some("character_set_name");
677 let sort = vec![];
678
679 query_from_information_schema_table(
680 query_engine,
681 catalog_manager,
682 query_ctx,
683 CHARACTER_SETS,
684 vec![],
685 projects,
686 filters,
687 like_field,
688 sort,
689 kind,
690 )
691 .await
692}
693
694pub fn show_variable(stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
695 let variable = stmt.variable.to_string().to_uppercase();
696 let value = match variable.as_str() {
697 "SYSTEM_TIME_ZONE" | "SYSTEM_TIMEZONE" => get_timezone(None).to_string(),
698 "TIME_ZONE" | "TIMEZONE" => query_ctx.timezone().to_string(),
699 "READ_PREFERENCE" => query_ctx.read_preference().to_string(),
700 "DATESTYLE" => {
701 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
702 format!("{}, {}", style, order)
703 }
704 "MAX_EXECUTION_TIME" => {
705 if query_ctx.channel() == Channel::Mysql {
706 query_ctx.query_timeout_as_millis().to_string()
707 } else {
708 return UnsupportedVariableSnafu { name: variable }.fail();
709 }
710 }
711 "STATEMENT_TIMEOUT" => {
712 if query_ctx.channel() == Channel::Postgres {
714 let mut timeout = query_ctx.query_timeout_as_millis().to_string();
715 timeout.push_str("ms");
716 timeout
717 } else {
718 return UnsupportedVariableSnafu { name: variable }.fail();
719 }
720 }
721 _ => return UnsupportedVariableSnafu { name: variable }.fail(),
722 };
723 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
724 variable,
725 ConcreteDataType::string_datatype(),
726 false,
727 )]));
728 let records = RecordBatches::try_from_columns(
729 schema,
730 vec![Arc::new(StringVector::from(vec![value])) as _],
731 )
732 .context(error::CreateRecordBatchSnafu)?;
733 Ok(Output::new_with_record_batches(records))
734}
735
736pub async fn show_status(_query_ctx: QueryContextRef) -> Result<Output> {
737 let schema = Arc::new(Schema::new(vec![
738 ColumnSchema::new("Variable_name", ConcreteDataType::string_datatype(), false),
739 ColumnSchema::new("Value", ConcreteDataType::string_datatype(), true),
740 ]));
741 let records = RecordBatches::try_from_columns(
742 schema,
743 vec![
744 Arc::new(StringVector::from(Vec::<&str>::new())) as _,
745 Arc::new(StringVector::from(Vec::<&str>::new())) as _,
746 ],
747 )
748 .context(error::CreateRecordBatchSnafu)?;
749 Ok(Output::new_with_record_batches(records))
750}
751
752pub async fn show_search_path(_query_ctx: QueryContextRef) -> Result<Output> {
753 let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
754 "search_path",
755 ConcreteDataType::string_datatype(),
756 false,
757 )]));
758 let records = RecordBatches::try_from_columns(
759 schema,
760 vec![Arc::new(StringVector::from(vec![_query_ctx.current_schema()])) as _],
761 )
762 .context(error::CreateRecordBatchSnafu)?;
763 Ok(Output::new_with_record_batches(records))
764}
765
766pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<Output> {
767 let stmt = CreateDatabase {
768 name: ObjectName(vec![Ident::new(database_name)]),
769 if_not_exists: true,
770 options,
771 };
772 let sql = format!("{stmt}");
773 let columns = vec![
774 Arc::new(StringVector::from(vec![database_name.to_string()])) as _,
775 Arc::new(StringVector::from(vec![sql])) as _,
776 ];
777 let records =
778 RecordBatches::try_from_columns(SHOW_CREATE_DATABASE_OUTPUT_SCHEMA.clone(), columns)
779 .context(error::CreateRecordBatchSnafu)?;
780 Ok(Output::new_with_record_batches(records))
781}
782
783pub fn show_create_table(
784 table: TableRef,
785 schema_options: Option<SchemaOptions>,
786 partitions: Option<Partitions>,
787 query_ctx: QueryContextRef,
788) -> Result<Output> {
789 let table_info = table.table_info();
790 let table_name = &table_info.name;
791
792 let quote_style = query_ctx.quote_style();
793
794 let mut stmt = create_table_stmt(&table_info, schema_options, quote_style)?;
795 stmt.partitions = partitions.map(|mut p| {
796 p.set_quote(quote_style);
797 p
798 });
799 let sql = format!("{}", stmt);
800 let columns = vec![
801 Arc::new(StringVector::from(vec![table_name.clone()])) as _,
802 Arc::new(StringVector::from(vec![sql])) as _,
803 ];
804 let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
805 .context(error::CreateRecordBatchSnafu)?;
806
807 Ok(Output::new_with_record_batches(records))
808}
809
810pub fn show_create_foreign_table_for_pg(
811 table: TableRef,
812 _query_ctx: QueryContextRef,
813) -> Result<Output> {
814 let table_info = table.table_info();
815
816 let table_meta = &table_info.meta;
817 let table_name = &table_info.name;
818 let schema = &table_info.meta.schema;
819 let is_metric_engine = is_metric_engine(&table_meta.engine);
820
821 let columns = schema
822 .column_schemas()
823 .iter()
824 .filter_map(|c| {
825 if is_metric_engine && is_metric_engine_internal_column(&c.name) {
826 None
827 } else {
828 Some(format!(
829 "\"{}\" {}",
830 c.name,
831 c.data_type.postgres_datatype_name()
832 ))
833 }
834 })
835 .join(",\n ");
836
837 let sql = format!(
838 r#"CREATE FOREIGN TABLE ft_{} (
839 {}
840)
841SERVER greptimedb
842OPTIONS (table_name '{}')"#,
843 table_name, columns, table_name
844 );
845
846 let columns = vec![
847 Arc::new(StringVector::from(vec![table_name.clone()])) as _,
848 Arc::new(StringVector::from(vec![sql])) as _,
849 ];
850 let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
851 .context(error::CreateRecordBatchSnafu)?;
852
853 Ok(Output::new_with_record_batches(records))
854}
855
856pub fn show_create_view(
857 view_name: ObjectName,
858 definition: &str,
859 query_ctx: QueryContextRef,
860) -> Result<Output> {
861 let mut parser_ctx =
862 ParserContext::new(query_ctx.sql_dialect(), definition).context(error::SqlSnafu)?;
863
864 let Statement::CreateView(create_view) =
865 parser_ctx.parse_statement().context(error::SqlSnafu)?
866 else {
867 unreachable!();
869 };
870
871 let stmt = CreateView {
872 name: view_name.clone(),
873 columns: create_view.columns,
874 query: create_view.query,
875 or_replace: create_view.or_replace,
876 if_not_exists: create_view.if_not_exists,
877 };
878
879 let sql = format!("{}", stmt);
880 let columns = vec![
881 Arc::new(StringVector::from(vec![view_name.to_string()])) as _,
882 Arc::new(StringVector::from(vec![sql])) as _,
883 ];
884 let records = RecordBatches::try_from_columns(SHOW_CREATE_VIEW_OUTPUT_SCHEMA.clone(), columns)
885 .context(error::CreateRecordBatchSnafu)?;
886
887 Ok(Output::new_with_record_batches(records))
888}
889
890pub async fn show_views(
892 stmt: ShowViews,
893 query_engine: &QueryEngineRef,
894 catalog_manager: &CatalogManagerRef,
895 query_ctx: QueryContextRef,
896) -> Result<Output> {
897 let schema_name = if let Some(database) = stmt.database {
898 database
899 } else {
900 query_ctx.current_schema()
901 };
902
903 let projects = vec![(tables::TABLE_NAME, VIEWS_COLUMN)];
904 let filters = vec![
905 col(tables::TABLE_SCHEMA).eq(lit(schema_name.clone())),
906 col(tables::TABLE_CATALOG).eq(lit(query_ctx.current_catalog())),
907 ];
908 let like_field = Some(tables::TABLE_NAME);
909 let sort = vec![col(tables::TABLE_NAME).sort(true, true)];
910
911 query_from_information_schema_table(
912 query_engine,
913 catalog_manager,
914 query_ctx,
915 VIEWS,
916 vec![],
917 projects,
918 filters,
919 like_field,
920 sort,
921 stmt.kind,
922 )
923 .await
924}
925
926pub async fn show_flows(
928 stmt: ShowFlows,
929 query_engine: &QueryEngineRef,
930 catalog_manager: &CatalogManagerRef,
931 query_ctx: QueryContextRef,
932) -> Result<Output> {
933 let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
934 let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
935 let like_field = Some(flows::FLOW_NAME);
936 let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
937
938 query_from_information_schema_table(
939 query_engine,
940 catalog_manager,
941 query_ctx,
942 FLOWS,
943 vec![],
944 projects,
945 filters,
946 like_field,
947 sort,
948 stmt.kind,
949 )
950 .await
951}
952
953pub fn show_create_flow(
954 flow_name: ObjectName,
955 flow_val: FlowInfoValue,
956 query_ctx: QueryContextRef,
957) -> Result<Output> {
958 let mut parser_ctx =
959 ParserContext::new(query_ctx.sql_dialect(), flow_val.raw_sql()).context(error::SqlSnafu)?;
960
961 let query = parser_ctx.parser_query().context(error::SqlSnafu)?;
962
963 let comment = if flow_val.comment().is_empty() {
964 None
965 } else {
966 Some(flow_val.comment().clone())
967 };
968
969 let stmt = CreateFlow {
970 flow_name,
971 sink_table_name: ObjectName(vec![Ident::new(&flow_val.sink_table_name().table_name)]),
972 or_replace: false,
975 if_not_exists: true,
976 expire_after: flow_val.expire_after(),
977 comment,
978 query,
979 };
980
981 let sql = format!("{}", stmt);
982 let columns = vec![
983 Arc::new(StringVector::from(vec![flow_val.flow_name().clone()])) as _,
984 Arc::new(StringVector::from(vec![sql])) as _,
985 ];
986 let records = RecordBatches::try_from_columns(SHOW_CREATE_FLOW_OUTPUT_SCHEMA.clone(), columns)
987 .context(error::CreateRecordBatchSnafu)?;
988
989 Ok(Output::new_with_record_batches(records))
990}
991
992pub fn describe_table(table: TableRef) -> Result<Output> {
993 let table_info = table.table_info();
994 let columns_schemas = table_info.meta.schema.column_schemas();
995 let columns = vec![
996 describe_column_names(columns_schemas),
997 describe_column_types(columns_schemas),
998 describe_column_keys(columns_schemas, &table_info.meta.primary_key_indices),
999 describe_column_nullables(columns_schemas),
1000 describe_column_defaults(columns_schemas),
1001 describe_column_semantic_types(columns_schemas, &table_info.meta.primary_key_indices),
1002 ];
1003 let records = RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), columns)
1004 .context(error::CreateRecordBatchSnafu)?;
1005 Ok(Output::new_with_record_batches(records))
1006}
1007
1008fn describe_column_names(columns_schemas: &[ColumnSchema]) -> VectorRef {
1009 Arc::new(StringVector::from_iterator(
1010 columns_schemas.iter().map(|cs| cs.name.as_str()),
1011 ))
1012}
1013
1014fn describe_column_types(columns_schemas: &[ColumnSchema]) -> VectorRef {
1015 Arc::new(StringVector::from(
1016 columns_schemas
1017 .iter()
1018 .map(|cs| cs.data_type.name())
1019 .collect::<Vec<_>>(),
1020 ))
1021}
1022
1023fn describe_column_keys(
1024 columns_schemas: &[ColumnSchema],
1025 primary_key_indices: &[usize],
1026) -> VectorRef {
1027 Arc::new(StringVector::from_iterator(
1028 columns_schemas.iter().enumerate().map(|(i, cs)| {
1029 if cs.is_time_index() || primary_key_indices.contains(&i) {
1030 PRI_KEY
1031 } else {
1032 ""
1033 }
1034 }),
1035 ))
1036}
1037
1038fn describe_column_nullables(columns_schemas: &[ColumnSchema]) -> VectorRef {
1039 Arc::new(StringVector::from_iterator(columns_schemas.iter().map(
1040 |cs| {
1041 if cs.is_nullable() {
1042 YES_STR
1043 } else {
1044 NO_STR
1045 }
1046 },
1047 )))
1048}
1049
1050fn describe_column_defaults(columns_schemas: &[ColumnSchema]) -> VectorRef {
1051 Arc::new(StringVector::from(
1052 columns_schemas
1053 .iter()
1054 .map(|cs| {
1055 cs.default_constraint()
1056 .map_or(String::from(""), |dc| dc.to_string())
1057 })
1058 .collect::<Vec<String>>(),
1059 ))
1060}
1061
1062fn describe_column_semantic_types(
1063 columns_schemas: &[ColumnSchema],
1064 primary_key_indices: &[usize],
1065) -> VectorRef {
1066 Arc::new(StringVector::from_iterator(
1067 columns_schemas.iter().enumerate().map(|(i, cs)| {
1068 if primary_key_indices.contains(&i) {
1069 SEMANTIC_TYPE_PRIMARY_KEY
1070 } else if cs.is_time_index() {
1071 SEMANTIC_TYPE_TIME_INDEX
1072 } else {
1073 SEMANTIC_TYPE_FIELD
1074 }
1075 }),
1076 ))
1077}
1078
1079pub async fn prepare_file_table_files(
1081 options: &HashMap<String, String>,
1082) -> Result<(ObjectStore, Vec<String>)> {
1083 let url = options
1084 .get(FILE_TABLE_LOCATION_KEY)
1085 .context(error::MissingRequiredFieldSnafu {
1086 name: FILE_TABLE_LOCATION_KEY,
1087 })?;
1088
1089 let (dir, filename) = find_dir_and_filename(url);
1090 let source = if let Some(filename) = filename {
1091 Source::Filename(filename)
1092 } else {
1093 Source::Dir
1094 };
1095 let regex = options
1096 .get(FILE_TABLE_PATTERN_KEY)
1097 .map(|x| Regex::new(x))
1098 .transpose()
1099 .context(error::BuildRegexSnafu)?;
1100 let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
1101 let lister = Lister::new(object_store.clone(), source, dir, regex);
1102 let files = lister
1107 .list()
1108 .await
1109 .context(error::ListObjectsSnafu)?
1110 .into_iter()
1111 .filter_map(|entry| {
1112 if entry.path().ends_with('/') {
1113 None
1114 } else {
1115 Some(entry.path().to_string())
1116 }
1117 })
1118 .collect::<Vec<_>>();
1119 Ok((object_store, files))
1120}
1121
1122pub async fn infer_file_table_schema(
1123 object_store: &ObjectStore,
1124 files: &[String],
1125 options: &HashMap<String, String>,
1126) -> Result<RawSchema> {
1127 let format = parse_file_table_format(options)?;
1128 let merged = infer_schemas(object_store, files, format.as_ref())
1129 .await
1130 .context(error::InferSchemaSnafu)?;
1131 Ok(RawSchema::from(
1132 &Schema::try_from(merged).context(error::ConvertSchemaSnafu)?,
1133 ))
1134}
1135
1136pub fn file_column_schemas_to_table(
1146 file_column_schemas: &[ColumnSchema],
1147) -> (Vec<ColumnSchema>, String) {
1148 let mut column_schemas = file_column_schemas.to_owned();
1149 if let Some(time_index_column) = column_schemas.iter().find(|c| c.is_time_index()) {
1150 let time_index = time_index_column.name.clone();
1151 return (column_schemas, time_index);
1152 }
1153
1154 let timestamp_type = ConcreteDataType::timestamp_millisecond_datatype();
1155 let default_zero = Value::Timestamp(Timestamp::new_millisecond(0));
1156 let timestamp_column_schema = ColumnSchema::new(GREPTIME_TIMESTAMP, timestamp_type, false)
1157 .with_time_index(true)
1158 .with_default_constraint(Some(ColumnDefaultConstraint::Value(default_zero)))
1159 .unwrap();
1160
1161 if let Some(column_schema) = column_schemas
1162 .iter_mut()
1163 .find(|column_schema| column_schema.name == GREPTIME_TIMESTAMP)
1164 {
1165 *column_schema = timestamp_column_schema;
1167 } else {
1168 column_schemas.push(timestamp_column_schema);
1169 }
1170
1171 (column_schemas, GREPTIME_TIMESTAMP.to_string())
1172}
1173
1174pub fn check_file_to_table_schema_compatibility(
1183 file_column_schemas: &[ColumnSchema],
1184 table_column_schemas: &[ColumnSchema],
1185) -> Result<()> {
1186 let file_schemas_map = file_column_schemas
1187 .iter()
1188 .map(|s| (s.name.clone(), s))
1189 .collect::<HashMap<_, _>>();
1190
1191 for table_column in table_column_schemas {
1192 if let Some(file_column) = file_schemas_map.get(&table_column.name) {
1193 ensure!(
1195 file_column
1196 .data_type
1197 .can_arrow_type_cast_to(&table_column.data_type),
1198 error::ColumnSchemaIncompatibleSnafu {
1199 column: table_column.name.clone(),
1200 file_type: file_column.data_type.clone(),
1201 table_type: table_column.data_type.clone(),
1202 }
1203 );
1204 } else {
1205 ensure!(
1206 table_column.is_nullable() || table_column.default_constraint().is_some(),
1207 error::ColumnSchemaNoDefaultSnafu {
1208 column: table_column.name.clone(),
1209 }
1210 );
1211 }
1212 }
1213
1214 Ok(())
1215}
1216
1217fn parse_file_table_format(options: &HashMap<String, String>) -> Result<Box<dyn FileFormat>> {
1218 Ok(
1219 match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
1220 Format::Csv(format) => Box::new(format),
1221 Format::Json(format) => Box::new(format),
1222 Format::Parquet(format) => Box::new(format),
1223 Format::Orc(format) => Box::new(format),
1224 },
1225 )
1226}
1227
1228#[cfg(test)]
1229mod test {
1230 use std::sync::Arc;
1231
1232 use common_query::{Output, OutputData};
1233 use common_recordbatch::{RecordBatch, RecordBatches};
1234 use common_time::timestamp::TimeUnit;
1235 use common_time::Timezone;
1236 use datatypes::prelude::ConcreteDataType;
1237 use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
1238 use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt32Vector, VectorRef};
1239 use session::context::QueryContextBuilder;
1240 use snafu::ResultExt;
1241 use sql::ast::{Ident, ObjectName};
1242 use sql::statements::show::ShowVariables;
1243 use table::test_util::MemTable;
1244 use table::TableRef;
1245
1246 use super::show_variable;
1247 use crate::error;
1248 use crate::error::Result;
1249 use crate::sql::{
1250 describe_table, DESCRIBE_TABLE_OUTPUT_SCHEMA, NO_STR, SEMANTIC_TYPE_FIELD,
1251 SEMANTIC_TYPE_TIME_INDEX, YES_STR,
1252 };
1253
1254 #[test]
1255 fn test_describe_table_multiple_columns() -> Result<()> {
1256 let table_name = "test_table";
1257 let schema = vec![
1258 ColumnSchema::new("t1", ConcreteDataType::uint32_datatype(), true),
1259 ColumnSchema::new(
1260 "t2",
1261 ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
1262 false,
1263 )
1264 .with_default_constraint(Some(ColumnDefaultConstraint::Function(String::from(
1265 "current_timestamp()",
1266 ))))
1267 .unwrap()
1268 .with_time_index(true),
1269 ];
1270 let data = vec![
1271 Arc::new(UInt32Vector::from_slice([0])) as _,
1272 Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
1273 ];
1274 let expected_columns = vec![
1275 Arc::new(StringVector::from(vec!["t1", "t2"])) as _,
1276 Arc::new(StringVector::from(vec!["UInt32", "TimestampMillisecond"])) as _,
1277 Arc::new(StringVector::from(vec!["", "PRI"])) as _,
1278 Arc::new(StringVector::from(vec![YES_STR, NO_STR])) as _,
1279 Arc::new(StringVector::from(vec!["", "current_timestamp()"])) as _,
1280 Arc::new(StringVector::from(vec![
1281 SEMANTIC_TYPE_FIELD,
1282 SEMANTIC_TYPE_TIME_INDEX,
1283 ])) as _,
1284 ];
1285
1286 describe_table_test_by_schema(table_name, schema, data, expected_columns)
1287 }
1288
1289 fn describe_table_test_by_schema(
1290 table_name: &str,
1291 schema: Vec<ColumnSchema>,
1292 data: Vec<VectorRef>,
1293 expected_columns: Vec<VectorRef>,
1294 ) -> Result<()> {
1295 let table_schema = SchemaRef::new(Schema::new(schema));
1296 let table = prepare_describe_table(table_name, table_schema, data);
1297
1298 let expected =
1299 RecordBatches::try_from_columns(DESCRIBE_TABLE_OUTPUT_SCHEMA.clone(), expected_columns)
1300 .context(error::CreateRecordBatchSnafu)?;
1301
1302 if let OutputData::RecordBatches(res) = describe_table(table)?.data {
1303 assert_eq!(res.take(), expected.take());
1304 } else {
1305 panic!("describe table must return record batch");
1306 }
1307
1308 Ok(())
1309 }
1310
1311 fn prepare_describe_table(
1312 table_name: &str,
1313 table_schema: SchemaRef,
1314 data: Vec<VectorRef>,
1315 ) -> TableRef {
1316 let record_batch = RecordBatch::new(table_schema, data).unwrap();
1317 MemTable::table(table_name, record_batch)
1318 }
1319
1320 #[test]
1321 fn test_show_variable() {
1322 assert_eq!(
1323 exec_show_variable("SYSTEM_TIME_ZONE", "Asia/Shanghai").unwrap(),
1324 "UTC"
1325 );
1326 assert_eq!(
1327 exec_show_variable("SYSTEM_TIMEZONE", "Asia/Shanghai").unwrap(),
1328 "UTC"
1329 );
1330 assert_eq!(
1331 exec_show_variable("TIME_ZONE", "Asia/Shanghai").unwrap(),
1332 "Asia/Shanghai"
1333 );
1334 assert_eq!(
1335 exec_show_variable("TIMEZONE", "Asia/Shanghai").unwrap(),
1336 "Asia/Shanghai"
1337 );
1338 assert!(exec_show_variable("TIME ZONE", "Asia/Shanghai").is_err());
1339 assert!(exec_show_variable("SYSTEM TIME ZONE", "Asia/Shanghai").is_err());
1340 }
1341
1342 fn exec_show_variable(variable: &str, tz: &str) -> Result<String> {
1343 let stmt = ShowVariables {
1344 variable: ObjectName(vec![Ident::new(variable)]),
1345 };
1346 let ctx = Arc::new(
1347 QueryContextBuilder::default()
1348 .timezone(Timezone::from_tz_string(tz).unwrap())
1349 .build(),
1350 );
1351 match show_variable(stmt, ctx) {
1352 Ok(Output {
1353 data: OutputData::RecordBatches(record),
1354 ..
1355 }) => {
1356 let record = record.take().first().cloned().unwrap();
1357 let data = record.column(0);
1358 Ok(data.get(0).to_string())
1359 }
1360 Ok(_) => unreachable!(),
1361 Err(e) => Err(e),
1362 }
1363 }
1364}