1use common_error::ext::BoxedError;
16use common_meta::key::schema_name::SchemaNameKey;
17use common_query::Output;
18use common_telemetry::tracing;
19use partition::manager::PartitionInfo;
20use session::context::QueryContextRef;
21use session::table_name::table_idents_to_full_name;
22use snafu::{OptionExt, ResultExt};
23use sql::statements::create::Partitions;
24use sql::statements::show::{
25 ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
26 ShowProcessList, ShowRegion, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
27};
28use sql::statements::OptionMap;
29use table::metadata::{TableInfo, TableType};
30use table::table_name::TableName;
31use table::TableRef;
32
33use crate::error::{
34 self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu,
35 FindViewInfoSnafu, InvalidSqlSnafu, Result, TableMetadataManagerSnafu, ViewInfoNotFoundSnafu,
36 ViewNotFoundSnafu,
37};
38use crate::statement::StatementExecutor;
39
40impl StatementExecutor {
41 #[tracing::instrument(skip_all)]
42 pub(super) async fn show_databases(
43 &self,
44 stmt: ShowDatabases,
45 query_ctx: QueryContextRef,
46 ) -> Result<Output> {
47 query::sql::show_databases(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
48 .await
49 .context(ExecuteStatementSnafu)
50 }
51
52 #[tracing::instrument(skip_all)]
53 pub(super) async fn show_tables(
54 &self,
55 stmt: ShowTables,
56 query_ctx: QueryContextRef,
57 ) -> Result<Output> {
58 query::sql::show_tables(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
59 .await
60 .context(ExecuteStatementSnafu)
61 }
62
63 #[tracing::instrument(skip_all)]
64 pub(super) async fn show_table_status(
65 &self,
66 stmt: ShowTableStatus,
67 query_ctx: QueryContextRef,
68 ) -> Result<Output> {
69 query::sql::show_table_status(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
70 .await
71 .context(ExecuteStatementSnafu)
72 }
73
74 #[tracing::instrument(skip_all)]
75 pub(super) async fn show_columns(
76 &self,
77 stmt: ShowColumns,
78 query_ctx: QueryContextRef,
79 ) -> Result<Output> {
80 query::sql::show_columns(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
81 .await
82 .context(ExecuteStatementSnafu)
83 }
84
85 #[tracing::instrument(skip_all)]
86 pub(super) async fn show_index(
87 &self,
88 stmt: ShowIndex,
89 query_ctx: QueryContextRef,
90 ) -> Result<Output> {
91 query::sql::show_index(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
92 .await
93 .context(ExecuteStatementSnafu)
94 }
95
96 pub(super) async fn show_region(
97 &self,
98 stmt: ShowRegion,
99 query_ctx: QueryContextRef,
100 ) -> Result<Output> {
101 query::sql::show_region(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
102 .await
103 .context(ExecuteStatementSnafu)
104 }
105
106 #[tracing::instrument(skip_all)]
107 pub async fn show_create_database(
108 &self,
109 database_name: &str,
110 opts: OptionMap,
111 ) -> Result<Output> {
112 query::sql::show_create_database(database_name, opts).context(ExecuteStatementSnafu)
113 }
114
115 #[tracing::instrument(skip_all)]
116 pub async fn show_create_table(
117 &self,
118 table_name: TableName,
119 table: TableRef,
120 query_ctx: QueryContextRef,
121 ) -> Result<Output> {
122 let table_info = table.table_info();
123 if table_info.table_type != TableType::Base {
124 return error::ShowCreateTableBaseOnlySnafu {
125 table_name: table_name.to_string(),
126 table_type: table_info.table_type,
127 }
128 .fail();
129 }
130
131 let schema_options = self
132 .table_metadata_manager
133 .schema_manager()
134 .get(SchemaNameKey {
135 catalog: &table_name.catalog_name,
136 schema: &table_name.schema_name,
137 })
138 .await
139 .context(TableMetadataManagerSnafu)?
140 .map(|v| v.into_inner());
141
142 let partitions = self
143 .partition_manager
144 .find_table_partitions(table_info.table_id())
145 .await
146 .context(error::FindTablePartitionRuleSnafu {
147 table_name: &table_name.table_name,
148 })?;
149
150 let partitions = create_partitions_stmt(&table_info, partitions)?;
151
152 query::sql::show_create_table(table, schema_options, partitions, query_ctx)
153 .context(ExecuteStatementSnafu)
154 }
155
156 #[tracing::instrument(skip_all)]
157 pub async fn show_create_table_for_pg(
158 &self,
159 table_name: TableName,
160 table: TableRef,
161 query_ctx: QueryContextRef,
162 ) -> Result<Output> {
163 let table_info = table.table_info();
164 if table_info.table_type != TableType::Base {
165 return error::ShowCreateTableBaseOnlySnafu {
166 table_name: table_name.to_string(),
167 table_type: table_info.table_type,
168 }
169 .fail();
170 }
171
172 query::sql::show_create_foreign_table_for_pg(table, query_ctx)
173 .context(ExecuteStatementSnafu)
174 }
175
176 #[tracing::instrument(skip_all)]
177 pub async fn show_create_view(
178 &self,
179 show: ShowCreateView,
180 query_ctx: QueryContextRef,
181 ) -> Result<Output> {
182 let (catalog, schema, view) = table_idents_to_full_name(&show.view_name, &query_ctx)
183 .map_err(BoxedError::new)
184 .context(ExternalSnafu)?;
185
186 let table_ref = self
187 .catalog_manager
188 .table(&catalog, &schema, &view, Some(&query_ctx))
189 .await
190 .context(CatalogSnafu)?
191 .context(ViewNotFoundSnafu { view_name: &view })?;
192
193 let view_id = table_ref.table_info().ident.table_id;
194
195 let view_info = self
196 .view_info_manager
197 .get(view_id)
198 .await
199 .context(FindViewInfoSnafu { view_name: &view })?
200 .context(ViewInfoNotFoundSnafu { view_name: &view })?;
201
202 query::sql::show_create_view(show.view_name, &view_info.definition, query_ctx)
203 .context(error::ExecuteStatementSnafu)
204 }
205
206 #[tracing::instrument(skip_all)]
207 pub(super) async fn show_views(
208 &self,
209 stmt: ShowViews,
210 query_ctx: QueryContextRef,
211 ) -> Result<Output> {
212 query::sql::show_views(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
213 .await
214 .context(ExecuteStatementSnafu)
215 }
216
217 #[tracing::instrument(skip_all)]
218 pub(super) async fn show_flows(
219 &self,
220 stmt: ShowFlows,
221 query_ctx: QueryContextRef,
222 ) -> Result<Output> {
223 query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
224 .await
225 .context(ExecuteStatementSnafu)
226 }
227
228 #[cfg(feature = "enterprise")]
229 #[tracing::instrument(skip_all)]
230 pub(super) async fn show_triggers(
231 &self,
232 stmt: sql::statements::show::trigger::ShowTriggers,
233 query_ctx: QueryContextRef,
234 ) -> Result<Output> {
235 query::sql::show_triggers(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
236 .await
237 .context(ExecuteStatementSnafu)
238 }
239
240 #[tracing::instrument(skip_all)]
241 pub async fn show_create_flow(
242 &self,
243 show: ShowCreateFlow,
244 query_ctx: QueryContextRef,
245 ) -> Result<Output> {
246 let obj_name = &show.flow_name;
247 let (catalog_name, flow_name) = match &obj_name.0[..] {
248 [table] => (query_ctx.current_catalog().to_string(), table.value.clone()),
249 [catalog, table] => (catalog.value.clone(), table.value.clone()),
250 _ => {
251 return InvalidSqlSnafu {
252 err_msg: format!(
253 "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {obj_name}",
254 ),
255 }
256 .fail()
257 }
258 };
259
260 let flow_name_val = self
261 .flow_metadata_manager
262 .flow_name_manager()
263 .get(&catalog_name, &flow_name)
264 .await
265 .context(error::TableMetadataManagerSnafu)?
266 .context(error::FlowNotFoundSnafu {
267 flow_name: &flow_name,
268 })?;
269
270 let flow_val = self
271 .flow_metadata_manager
272 .flow_info_manager()
273 .get(flow_name_val.flow_id())
274 .await
275 .context(error::TableMetadataManagerSnafu)?
276 .context(error::FlowNotFoundSnafu {
277 flow_name: &flow_name,
278 })?;
279
280 query::sql::show_create_flow(obj_name.clone(), flow_val, query_ctx)
281 .context(error::ExecuteStatementSnafu)
282 }
283
284 #[tracing::instrument(skip_all)]
285 pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
286 query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
287 }
288
289 #[tracing::instrument(skip_all)]
290 pub async fn show_collation(
291 &self,
292 kind: ShowKind,
293 query_ctx: QueryContextRef,
294 ) -> Result<Output> {
295 query::sql::show_collations(kind, &self.query_engine, &self.catalog_manager, query_ctx)
296 .await
297 .context(error::ExecuteStatementSnafu)
298 }
299
300 #[tracing::instrument(skip_all)]
301 pub async fn show_charset(&self, kind: ShowKind, query_ctx: QueryContextRef) -> Result<Output> {
302 query::sql::show_charsets(kind, &self.query_engine, &self.catalog_manager, query_ctx)
303 .await
304 .context(error::ExecuteStatementSnafu)
305 }
306
307 #[tracing::instrument(skip_all)]
308 pub async fn show_status(&self, query_ctx: QueryContextRef) -> Result<Output> {
309 query::sql::show_status(query_ctx)
310 .await
311 .context(error::ExecuteStatementSnafu)
312 }
313 pub async fn show_search_path(&self, query_ctx: QueryContextRef) -> Result<Output> {
314 query::sql::show_search_path(query_ctx)
315 .await
316 .context(error::ExecuteStatementSnafu)
317 }
318
319 pub async fn show_processlist(
320 &self,
321 stmt: ShowProcessList,
322 query_ctx: QueryContextRef,
323 ) -> Result<Output> {
324 query::sql::show_processlist(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
325 .await
326 .context(ExecLogicalPlanSnafu)
327 }
328}
329
330pub(crate) fn create_partitions_stmt(
331 table_info: &TableInfo,
332 partitions: Vec<PartitionInfo>,
333) -> Result<Option<Partitions>> {
334 if partitions.is_empty() {
335 return Ok(None);
336 }
337
338 let column_list = table_info
339 .meta
340 .partition_column_names()
341 .map(|name| name[..].into())
342 .collect();
343
344 let exprs = partitions
345 .iter()
346 .filter_map(|partition| {
347 partition
348 .partition_expr
349 .as_ref()
350 .map(|expr| expr.to_parser_expr())
351 })
352 .collect();
353
354 Ok(Some(Partitions { column_list, exprs }))
355}