operator/statement/
show.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_error::ext::BoxedError;
18use common_meta::key::schema_name::SchemaNameKey;
19use common_query::Output;
20use common_telemetry::tracing;
21use partition::manager::PartitionInfoWithVersion;
22use session::context::QueryContextRef;
23use session::table_name::table_idents_to_full_name;
24use snafu::{OptionExt, ResultExt};
25use sql::ast::ObjectNamePartExt;
26use sql::statements::OptionMap;
27use sql::statements::create::Partitions;
28use sql::statements::show::{
29    ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
30    ShowProcessList, ShowRegion, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
31};
32use table::TableRef;
33use table::metadata::{TableInfo, TableType};
34use table::table_name::TableName;
35
36use crate::error::{
37    self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu,
38    FindViewInfoSnafu, InvalidSqlSnafu, Result, TableMetadataManagerSnafu, ViewInfoNotFoundSnafu,
39    ViewNotFoundSnafu,
40};
41use crate::statement::StatementExecutor;
42
43impl StatementExecutor {
44    #[tracing::instrument(skip_all)]
45    pub(super) async fn show_databases(
46        &self,
47        stmt: ShowDatabases,
48        query_ctx: QueryContextRef,
49    ) -> Result<Output> {
50        query::sql::show_databases(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
51            .await
52            .context(ExecuteStatementSnafu)
53    }
54
55    #[tracing::instrument(skip_all)]
56    pub(super) async fn show_tables(
57        &self,
58        stmt: ShowTables,
59        query_ctx: QueryContextRef,
60    ) -> Result<Output> {
61        query::sql::show_tables(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
62            .await
63            .context(ExecuteStatementSnafu)
64    }
65
66    #[tracing::instrument(skip_all)]
67    pub(super) async fn show_table_status(
68        &self,
69        stmt: ShowTableStatus,
70        query_ctx: QueryContextRef,
71    ) -> Result<Output> {
72        query::sql::show_table_status(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
73            .await
74            .context(ExecuteStatementSnafu)
75    }
76
77    #[tracing::instrument(skip_all)]
78    pub(super) async fn show_columns(
79        &self,
80        stmt: ShowColumns,
81        query_ctx: QueryContextRef,
82    ) -> Result<Output> {
83        query::sql::show_columns(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
84            .await
85            .context(ExecuteStatementSnafu)
86    }
87
88    #[tracing::instrument(skip_all)]
89    pub(super) async fn show_index(
90        &self,
91        stmt: ShowIndex,
92        query_ctx: QueryContextRef,
93    ) -> Result<Output> {
94        query::sql::show_index(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
95            .await
96            .context(ExecuteStatementSnafu)
97    }
98
99    pub(super) async fn show_region(
100        &self,
101        stmt: ShowRegion,
102        query_ctx: QueryContextRef,
103    ) -> Result<Output> {
104        query::sql::show_region(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
105            .await
106            .context(ExecuteStatementSnafu)
107    }
108
109    #[tracing::instrument(skip_all)]
110    pub async fn show_create_database(
111        &self,
112        database_name: &str,
113        opts: OptionMap,
114    ) -> Result<Output> {
115        query::sql::show_create_database(database_name, opts).context(ExecuteStatementSnafu)
116    }
117
118    #[tracing::instrument(skip_all)]
119    pub async fn show_create_table(
120        &self,
121        table_name: TableName,
122        table: TableRef,
123        query_ctx: QueryContextRef,
124    ) -> Result<Output> {
125        let mut table_info = table.table_info();
126        let partition_column_names: Vec<_> =
127            table_info.meta.partition_column_names().cloned().collect();
128
129        if let Some(latest) = self
130            .table_metadata_manager
131            .table_info_manager()
132            .get(table_info.table_id())
133            .await
134            .context(TableMetadataManagerSnafu)?
135        {
136            let mut latest_info = latest.into_inner().table_info;
137
138            if !partition_column_names.is_empty() {
139                latest_info.meta.partition_key_indices = partition_column_names
140                    .iter()
141                    .filter_map(|name| latest_info.meta.schema.column_index_by_name(name.as_str()))
142                    .collect();
143            }
144
145            table_info = Arc::new(latest_info);
146        }
147
148        if table_info.table_type != TableType::Base {
149            return error::ShowCreateTableBaseOnlySnafu {
150                table_name: table_name.to_string(),
151                table_type: table_info.table_type,
152            }
153            .fail();
154        }
155
156        let schema_options = self
157            .table_metadata_manager
158            .schema_manager()
159            .get(SchemaNameKey {
160                catalog: &table_name.catalog_name,
161                schema: &table_name.schema_name,
162            })
163            .await
164            .context(TableMetadataManagerSnafu)?
165            .map(|v| v.into_inner());
166
167        let partition_info = self
168            .partition_manager
169            .find_physical_partition_info(table_info.table_id())
170            .await
171            .context(error::FindTablePartitionRuleSnafu {
172                table_name: &table_name.table_name,
173            })?;
174
175        let partitions = create_partitions_stmt(&table_info, &partition_info.partitions)?;
176
177        query::sql::show_create_table(table_info, schema_options, partitions, query_ctx)
178            .context(ExecuteStatementSnafu)
179    }
180
181    #[tracing::instrument(skip_all)]
182    pub async fn show_create_table_for_pg(
183        &self,
184        table_name: TableName,
185        table: TableRef,
186        query_ctx: QueryContextRef,
187    ) -> Result<Output> {
188        let table_info = table.table_info();
189        if table_info.table_type != TableType::Base {
190            return error::ShowCreateTableBaseOnlySnafu {
191                table_name: table_name.to_string(),
192                table_type: table_info.table_type,
193            }
194            .fail();
195        }
196
197        query::sql::show_create_foreign_table_for_pg(table, query_ctx)
198            .context(ExecuteStatementSnafu)
199    }
200
201    #[tracing::instrument(skip_all)]
202    pub async fn show_create_view(
203        &self,
204        show: ShowCreateView,
205        query_ctx: QueryContextRef,
206    ) -> Result<Output> {
207        let (catalog, schema, view) = table_idents_to_full_name(&show.view_name, &query_ctx)
208            .map_err(BoxedError::new)
209            .context(ExternalSnafu)?;
210
211        let table_ref = self
212            .catalog_manager
213            .table(&catalog, &schema, &view, Some(&query_ctx))
214            .await
215            .context(CatalogSnafu)?
216            .context(ViewNotFoundSnafu { view_name: &view })?;
217
218        let view_id = table_ref.table_info().ident.table_id;
219
220        let view_info = self
221            .view_info_manager
222            .get(view_id)
223            .await
224            .context(FindViewInfoSnafu { view_name: &view })?
225            .context(ViewInfoNotFoundSnafu { view_name: &view })?;
226
227        query::sql::show_create_view(show.view_name, &view_info.definition, query_ctx)
228            .context(error::ExecuteStatementSnafu)
229    }
230
231    #[tracing::instrument(skip_all)]
232    pub(super) async fn show_views(
233        &self,
234        stmt: ShowViews,
235        query_ctx: QueryContextRef,
236    ) -> Result<Output> {
237        query::sql::show_views(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
238            .await
239            .context(ExecuteStatementSnafu)
240    }
241
242    #[tracing::instrument(skip_all)]
243    pub(super) async fn show_flows(
244        &self,
245        stmt: ShowFlows,
246        query_ctx: QueryContextRef,
247    ) -> Result<Output> {
248        query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
249            .await
250            .context(ExecuteStatementSnafu)
251    }
252
253    #[cfg(feature = "enterprise")]
254    #[tracing::instrument(skip_all)]
255    pub(super) async fn show_triggers(
256        &self,
257        stmt: sql::statements::show::trigger::ShowTriggers,
258        query_ctx: QueryContextRef,
259    ) -> Result<Output> {
260        query::sql::show_triggers(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
261            .await
262            .context(ExecuteStatementSnafu)
263    }
264
265    #[tracing::instrument(skip_all)]
266    pub async fn show_create_flow(
267        &self,
268        show: ShowCreateFlow,
269        query_ctx: QueryContextRef,
270    ) -> Result<Output> {
271        let obj_name = &show.flow_name;
272        let (catalog_name, flow_name) = match &obj_name.0[..] {
273            [flow] => (query_ctx.current_catalog().to_string(), flow.to_string_unquoted()),
274            [catalog, flow] => (catalog.to_string_unquoted(), flow.to_string_unquoted()),
275            _ => {
276                return InvalidSqlSnafu {
277                    err_msg: format!(
278                        "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {obj_name}",
279                    ),
280                }
281                .fail()
282            }
283        };
284
285        let flow_name_val = self
286            .flow_metadata_manager
287            .flow_name_manager()
288            .get(&catalog_name, &flow_name)
289            .await
290            .context(error::TableMetadataManagerSnafu)?
291            .context(error::FlowNotFoundSnafu {
292                flow_name: &flow_name,
293            })?;
294
295        let flow_val = self
296            .flow_metadata_manager
297            .flow_info_manager()
298            .get(flow_name_val.flow_id())
299            .await
300            .context(error::TableMetadataManagerSnafu)?
301            .context(error::FlowNotFoundSnafu {
302                flow_name: &flow_name,
303            })?;
304
305        query::sql::show_create_flow(obj_name.clone(), flow_val, query_ctx)
306            .context(error::ExecuteStatementSnafu)
307    }
308
309    #[cfg(feature = "enterprise")]
310    #[tracing::instrument(skip_all)]
311    pub async fn show_create_trigger(
312        &self,
313        show: sql::statements::show::trigger::ShowCreateTrigger,
314        query_ctx: QueryContextRef,
315    ) -> Result<Output> {
316        let Some(trigger_querier) = self.trigger_querier.as_ref() else {
317            return error::MissingTriggerQuerierSnafu.fail();
318        };
319
320        let obj_name = &show.trigger_name;
321        let (catalog_name, trigger_name) = match &obj_name.0[..] {
322            [trigger] => (query_ctx.current_catalog().to_string(), trigger.to_string_unquoted()),
323            [catalog, trigger] => (catalog.to_string_unquoted(), trigger.to_string_unquoted()),
324            _ => {
325                return InvalidSqlSnafu {
326                    err_msg: format!(
327                        "expect trigger name to be <catalog>.<trigger_name> or <trigger_name>, actual: {obj_name}",
328                    ),
329                }
330                .fail()
331            }
332        };
333        trigger_querier
334            .show_create_trigger(&catalog_name, &trigger_name, &query_ctx)
335            .await
336            .context(error::TriggerQuerierSnafu)
337    }
338
339    #[tracing::instrument(skip_all)]
340    pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
341        query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
342    }
343
344    #[tracing::instrument(skip_all)]
345    pub async fn show_collation(
346        &self,
347        kind: ShowKind,
348        query_ctx: QueryContextRef,
349    ) -> Result<Output> {
350        query::sql::show_collations(kind, &self.query_engine, &self.catalog_manager, query_ctx)
351            .await
352            .context(error::ExecuteStatementSnafu)
353    }
354
355    #[tracing::instrument(skip_all)]
356    pub async fn show_charset(&self, kind: ShowKind, query_ctx: QueryContextRef) -> Result<Output> {
357        query::sql::show_charsets(kind, &self.query_engine, &self.catalog_manager, query_ctx)
358            .await
359            .context(error::ExecuteStatementSnafu)
360    }
361
362    #[tracing::instrument(skip_all)]
363    pub async fn show_status(&self, query_ctx: QueryContextRef) -> Result<Output> {
364        query::sql::show_status(query_ctx)
365            .await
366            .context(error::ExecuteStatementSnafu)
367    }
368    pub async fn show_search_path(&self, query_ctx: QueryContextRef) -> Result<Output> {
369        query::sql::show_search_path(query_ctx)
370            .await
371            .context(error::ExecuteStatementSnafu)
372    }
373
374    pub async fn show_processlist(
375        &self,
376        stmt: ShowProcessList,
377        query_ctx: QueryContextRef,
378    ) -> Result<Output> {
379        query::sql::show_processlist(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
380            .await
381            .context(ExecLogicalPlanSnafu)
382    }
383}
384
385pub(crate) fn create_partitions_stmt(
386    table_info: &TableInfo,
387    partitions: &[PartitionInfoWithVersion],
388) -> Result<Option<Partitions>> {
389    if partitions.is_empty() {
390        return Ok(None);
391    }
392
393    let column_list = table_info
394        .meta
395        .partition_column_names()
396        .map(|name| name[..].into())
397        .collect();
398
399    let exprs = partitions
400        .iter()
401        .filter_map(|partition| {
402            partition
403                .partition_expr
404                .as_ref()
405                .map(|expr| expr.to_parser_expr())
406        })
407        .collect();
408
409    Ok(Some(Partitions { column_list, exprs }))
410}