operator/statement/
show.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::BoxedError;
16use common_meta::key::schema_name::SchemaNameKey;
17use common_query::Output;
18use common_telemetry::tracing;
19use partition::manager::PartitionInfo;
20use partition::partition::PartitionBound;
21use session::context::QueryContextRef;
22use session::table_name::table_idents_to_full_name;
23use snafu::{OptionExt, ResultExt};
24use sql::ast::Ident;
25use sql::statements::create::Partitions;
26use sql::statements::show::{
27    ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
28    ShowProcessList, ShowRegion, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
29};
30use sql::statements::OptionMap;
31use table::metadata::TableType;
32use table::table_name::TableName;
33use table::TableRef;
34
35use crate::error::{
36    self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu,
37    FindViewInfoSnafu, InvalidSqlSnafu, Result, TableMetadataManagerSnafu, ViewInfoNotFoundSnafu,
38    ViewNotFoundSnafu,
39};
40use crate::statement::StatementExecutor;
41
42impl StatementExecutor {
43    #[tracing::instrument(skip_all)]
44    pub(super) async fn show_databases(
45        &self,
46        stmt: ShowDatabases,
47        query_ctx: QueryContextRef,
48    ) -> Result<Output> {
49        query::sql::show_databases(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
50            .await
51            .context(ExecuteStatementSnafu)
52    }
53
54    #[tracing::instrument(skip_all)]
55    pub(super) async fn show_tables(
56        &self,
57        stmt: ShowTables,
58        query_ctx: QueryContextRef,
59    ) -> Result<Output> {
60        query::sql::show_tables(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
61            .await
62            .context(ExecuteStatementSnafu)
63    }
64
65    #[tracing::instrument(skip_all)]
66    pub(super) async fn show_table_status(
67        &self,
68        stmt: ShowTableStatus,
69        query_ctx: QueryContextRef,
70    ) -> Result<Output> {
71        query::sql::show_table_status(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
72            .await
73            .context(ExecuteStatementSnafu)
74    }
75
76    #[tracing::instrument(skip_all)]
77    pub(super) async fn show_columns(
78        &self,
79        stmt: ShowColumns,
80        query_ctx: QueryContextRef,
81    ) -> Result<Output> {
82        query::sql::show_columns(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
83            .await
84            .context(ExecuteStatementSnafu)
85    }
86
87    #[tracing::instrument(skip_all)]
88    pub(super) async fn show_index(
89        &self,
90        stmt: ShowIndex,
91        query_ctx: QueryContextRef,
92    ) -> Result<Output> {
93        query::sql::show_index(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
94            .await
95            .context(ExecuteStatementSnafu)
96    }
97
98    pub(super) async fn show_region(
99        &self,
100        stmt: ShowRegion,
101        query_ctx: QueryContextRef,
102    ) -> Result<Output> {
103        query::sql::show_region(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
104            .await
105            .context(ExecuteStatementSnafu)
106    }
107
108    #[tracing::instrument(skip_all)]
109    pub async fn show_create_database(
110        &self,
111        database_name: &str,
112        opts: OptionMap,
113    ) -> Result<Output> {
114        query::sql::show_create_database(database_name, opts).context(ExecuteStatementSnafu)
115    }
116
117    #[tracing::instrument(skip_all)]
118    pub async fn show_create_table(
119        &self,
120        table_name: TableName,
121        table: TableRef,
122        query_ctx: QueryContextRef,
123    ) -> Result<Output> {
124        let table_info = table.table_info();
125        if table_info.table_type != TableType::Base {
126            return error::ShowCreateTableBaseOnlySnafu {
127                table_name: table_name.to_string(),
128                table_type: table_info.table_type,
129            }
130            .fail();
131        }
132
133        let schema_options = self
134            .table_metadata_manager
135            .schema_manager()
136            .get(SchemaNameKey {
137                catalog: &table_name.catalog_name,
138                schema: &table_name.schema_name,
139            })
140            .await
141            .context(TableMetadataManagerSnafu)?
142            .map(|v| v.into_inner());
143
144        let partitions = self
145            .partition_manager
146            .find_table_partitions(table.table_info().table_id())
147            .await
148            .context(error::FindTablePartitionRuleSnafu {
149                table_name: &table_name.table_name,
150            })?;
151
152        let partitions = create_partitions_stmt(partitions)?;
153
154        query::sql::show_create_table(table, schema_options, partitions, query_ctx)
155            .context(ExecuteStatementSnafu)
156    }
157
158    #[tracing::instrument(skip_all)]
159    pub async fn show_create_table_for_pg(
160        &self,
161        table_name: TableName,
162        table: TableRef,
163        query_ctx: QueryContextRef,
164    ) -> Result<Output> {
165        let table_info = table.table_info();
166        if table_info.table_type != TableType::Base {
167            return error::ShowCreateTableBaseOnlySnafu {
168                table_name: table_name.to_string(),
169                table_type: table_info.table_type,
170            }
171            .fail();
172        }
173
174        query::sql::show_create_foreign_table_for_pg(table, query_ctx)
175            .context(ExecuteStatementSnafu)
176    }
177
178    #[tracing::instrument(skip_all)]
179    pub async fn show_create_view(
180        &self,
181        show: ShowCreateView,
182        query_ctx: QueryContextRef,
183    ) -> Result<Output> {
184        let (catalog, schema, view) = table_idents_to_full_name(&show.view_name, &query_ctx)
185            .map_err(BoxedError::new)
186            .context(ExternalSnafu)?;
187
188        let table_ref = self
189            .catalog_manager
190            .table(&catalog, &schema, &view, Some(&query_ctx))
191            .await
192            .context(CatalogSnafu)?
193            .context(ViewNotFoundSnafu { view_name: &view })?;
194
195        let view_id = table_ref.table_info().ident.table_id;
196
197        let view_info = self
198            .view_info_manager
199            .get(view_id)
200            .await
201            .context(FindViewInfoSnafu { view_name: &view })?
202            .context(ViewInfoNotFoundSnafu { view_name: &view })?;
203
204        query::sql::show_create_view(show.view_name, &view_info.definition, query_ctx)
205            .context(error::ExecuteStatementSnafu)
206    }
207
208    #[tracing::instrument(skip_all)]
209    pub(super) async fn show_views(
210        &self,
211        stmt: ShowViews,
212        query_ctx: QueryContextRef,
213    ) -> Result<Output> {
214        query::sql::show_views(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
215            .await
216            .context(ExecuteStatementSnafu)
217    }
218
219    #[tracing::instrument(skip_all)]
220    pub(super) async fn show_flows(
221        &self,
222        stmt: ShowFlows,
223        query_ctx: QueryContextRef,
224    ) -> Result<Output> {
225        query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
226            .await
227            .context(ExecuteStatementSnafu)
228    }
229
230    #[cfg(feature = "enterprise")]
231    #[tracing::instrument(skip_all)]
232    pub(super) async fn show_triggers(
233        &self,
234        stmt: sql::statements::show::trigger::ShowTriggers,
235        query_ctx: QueryContextRef,
236    ) -> Result<Output> {
237        query::sql::show_triggers(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
238            .await
239            .context(ExecuteStatementSnafu)
240    }
241
242    #[tracing::instrument(skip_all)]
243    pub async fn show_create_flow(
244        &self,
245        show: ShowCreateFlow,
246        query_ctx: QueryContextRef,
247    ) -> Result<Output> {
248        let obj_name = &show.flow_name;
249        let (catalog_name, flow_name) = match &obj_name.0[..] {
250            [table] => (query_ctx.current_catalog().to_string(), table.value.clone()),
251            [catalog, table] => (catalog.value.clone(), table.value.clone()),
252            _ => {
253                return InvalidSqlSnafu {
254                    err_msg: format!(
255                        "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {obj_name}",
256                    ),
257                }
258                .fail()
259            }
260        };
261
262        let flow_name_val = self
263            .flow_metadata_manager
264            .flow_name_manager()
265            .get(&catalog_name, &flow_name)
266            .await
267            .context(error::TableMetadataManagerSnafu)?
268            .context(error::FlowNotFoundSnafu {
269                flow_name: &flow_name,
270            })?;
271
272        let flow_val = self
273            .flow_metadata_manager
274            .flow_info_manager()
275            .get(flow_name_val.flow_id())
276            .await
277            .context(error::TableMetadataManagerSnafu)?
278            .context(error::FlowNotFoundSnafu {
279                flow_name: &flow_name,
280            })?;
281
282        query::sql::show_create_flow(obj_name.clone(), flow_val, query_ctx)
283            .context(error::ExecuteStatementSnafu)
284    }
285
286    #[tracing::instrument(skip_all)]
287    pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
288        query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
289    }
290
291    #[tracing::instrument(skip_all)]
292    pub async fn show_collation(
293        &self,
294        kind: ShowKind,
295        query_ctx: QueryContextRef,
296    ) -> Result<Output> {
297        query::sql::show_collations(kind, &self.query_engine, &self.catalog_manager, query_ctx)
298            .await
299            .context(error::ExecuteStatementSnafu)
300    }
301
302    #[tracing::instrument(skip_all)]
303    pub async fn show_charset(&self, kind: ShowKind, query_ctx: QueryContextRef) -> Result<Output> {
304        query::sql::show_charsets(kind, &self.query_engine, &self.catalog_manager, query_ctx)
305            .await
306            .context(error::ExecuteStatementSnafu)
307    }
308
309    #[tracing::instrument(skip_all)]
310    pub async fn show_status(&self, query_ctx: QueryContextRef) -> Result<Output> {
311        query::sql::show_status(query_ctx)
312            .await
313            .context(error::ExecuteStatementSnafu)
314    }
315    pub async fn show_search_path(&self, query_ctx: QueryContextRef) -> Result<Output> {
316        query::sql::show_search_path(query_ctx)
317            .await
318            .context(error::ExecuteStatementSnafu)
319    }
320
321    pub async fn show_processlist(
322        &self,
323        stmt: ShowProcessList,
324        query_ctx: QueryContextRef,
325    ) -> Result<Output> {
326        query::sql::show_processlist(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
327            .await
328            .context(ExecLogicalPlanSnafu)
329    }
330}
331
332pub(crate) fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Partitions>> {
333    if partitions.is_empty() {
334        return Ok(None);
335    }
336
337    let column_list: Vec<Ident> = partitions[0]
338        .partition
339        .partition_columns()
340        .iter()
341        .map(|name| name[..].into())
342        .collect();
343
344    let exprs = partitions
345        .iter()
346        .filter_map(|partition| {
347            partition
348                .partition
349                .partition_bounds()
350                .first()
351                .and_then(|bound| {
352                    if let PartitionBound::Expr(expr) = bound {
353                        Some(expr.to_parser_expr())
354                    } else {
355                        None
356                    }
357                })
358        })
359        .collect();
360
361    Ok(Some(Partitions { column_list, exprs }))
362}