1use std::sync::Arc;
16
17use common_error::ext::BoxedError;
18use common_meta::key::schema_name::SchemaNameKey;
19use common_query::Output;
20use common_telemetry::tracing;
21use partition::manager::PartitionInfoWithVersion;
22use session::context::QueryContextRef;
23use session::table_name::table_idents_to_full_name;
24use snafu::{OptionExt, ResultExt};
25use sql::ast::ObjectNamePartExt;
26use sql::statements::OptionMap;
27use sql::statements::create::Partitions;
28use sql::statements::show::{
29 ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
30 ShowProcessList, ShowRegion, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
31};
32use table::TableRef;
33use table::metadata::{TableInfo, TableType};
34use table::table_name::TableName;
35
36use crate::error::{
37 self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu,
38 FindViewInfoSnafu, InvalidSqlSnafu, Result, TableMetadataManagerSnafu, ViewInfoNotFoundSnafu,
39 ViewNotFoundSnafu,
40};
41use crate::statement::StatementExecutor;
42
43impl StatementExecutor {
44 #[tracing::instrument(skip_all)]
45 pub(super) async fn show_databases(
46 &self,
47 stmt: ShowDatabases,
48 query_ctx: QueryContextRef,
49 ) -> Result<Output> {
50 query::sql::show_databases(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
51 .await
52 .context(ExecuteStatementSnafu)
53 }
54
55 #[tracing::instrument(skip_all)]
56 pub(super) async fn show_tables(
57 &self,
58 stmt: ShowTables,
59 query_ctx: QueryContextRef,
60 ) -> Result<Output> {
61 query::sql::show_tables(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
62 .await
63 .context(ExecuteStatementSnafu)
64 }
65
66 #[tracing::instrument(skip_all)]
67 pub(super) async fn show_table_status(
68 &self,
69 stmt: ShowTableStatus,
70 query_ctx: QueryContextRef,
71 ) -> Result<Output> {
72 query::sql::show_table_status(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
73 .await
74 .context(ExecuteStatementSnafu)
75 }
76
77 #[tracing::instrument(skip_all)]
78 pub(super) async fn show_columns(
79 &self,
80 stmt: ShowColumns,
81 query_ctx: QueryContextRef,
82 ) -> Result<Output> {
83 query::sql::show_columns(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
84 .await
85 .context(ExecuteStatementSnafu)
86 }
87
88 #[tracing::instrument(skip_all)]
89 pub(super) async fn show_index(
90 &self,
91 stmt: ShowIndex,
92 query_ctx: QueryContextRef,
93 ) -> Result<Output> {
94 query::sql::show_index(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
95 .await
96 .context(ExecuteStatementSnafu)
97 }
98
99 pub(super) async fn show_region(
100 &self,
101 stmt: ShowRegion,
102 query_ctx: QueryContextRef,
103 ) -> Result<Output> {
104 query::sql::show_region(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
105 .await
106 .context(ExecuteStatementSnafu)
107 }
108
109 #[tracing::instrument(skip_all)]
110 pub async fn show_create_database(
111 &self,
112 database_name: &str,
113 opts: OptionMap,
114 ) -> Result<Output> {
115 query::sql::show_create_database(database_name, opts).context(ExecuteStatementSnafu)
116 }
117
118 #[tracing::instrument(skip_all)]
119 pub async fn show_create_table(
120 &self,
121 table_name: TableName,
122 table: TableRef,
123 query_ctx: QueryContextRef,
124 ) -> Result<Output> {
125 let mut table_info = table.table_info();
126 let partition_column_names: Vec<_> =
127 table_info.meta.partition_column_names().cloned().collect();
128
129 if let Some(latest) = self
130 .table_metadata_manager
131 .table_info_manager()
132 .get(table_info.table_id())
133 .await
134 .context(TableMetadataManagerSnafu)?
135 {
136 let mut latest_info = latest.into_inner().table_info;
137
138 if !partition_column_names.is_empty() {
139 latest_info.meta.partition_key_indices = partition_column_names
140 .iter()
141 .filter_map(|name| latest_info.meta.schema.column_index_by_name(name.as_str()))
142 .collect();
143 }
144
145 table_info = Arc::new(latest_info);
146 }
147
148 if table_info.table_type != TableType::Base {
149 return error::ShowCreateTableBaseOnlySnafu {
150 table_name: table_name.to_string(),
151 table_type: table_info.table_type,
152 }
153 .fail();
154 }
155
156 let schema_options = self
157 .table_metadata_manager
158 .schema_manager()
159 .get(SchemaNameKey {
160 catalog: &table_name.catalog_name,
161 schema: &table_name.schema_name,
162 })
163 .await
164 .context(TableMetadataManagerSnafu)?
165 .map(|v| v.into_inner());
166
167 let partition_info = self
168 .partition_manager
169 .find_physical_partition_info(table_info.table_id())
170 .await
171 .context(error::FindTablePartitionRuleSnafu {
172 table_name: &table_name.table_name,
173 })?;
174
175 let partitions = create_partitions_stmt(&table_info, &partition_info.partitions)?;
176
177 query::sql::show_create_table(table_info, schema_options, partitions, query_ctx)
178 .context(ExecuteStatementSnafu)
179 }
180
181 #[tracing::instrument(skip_all)]
182 pub async fn show_create_table_for_pg(
183 &self,
184 table_name: TableName,
185 table: TableRef,
186 query_ctx: QueryContextRef,
187 ) -> Result<Output> {
188 let table_info = table.table_info();
189 if table_info.table_type != TableType::Base {
190 return error::ShowCreateTableBaseOnlySnafu {
191 table_name: table_name.to_string(),
192 table_type: table_info.table_type,
193 }
194 .fail();
195 }
196
197 query::sql::show_create_foreign_table_for_pg(table, query_ctx)
198 .context(ExecuteStatementSnafu)
199 }
200
201 #[tracing::instrument(skip_all)]
202 pub async fn show_create_view(
203 &self,
204 show: ShowCreateView,
205 query_ctx: QueryContextRef,
206 ) -> Result<Output> {
207 let (catalog, schema, view) = table_idents_to_full_name(&show.view_name, &query_ctx)
208 .map_err(BoxedError::new)
209 .context(ExternalSnafu)?;
210
211 let table_ref = self
212 .catalog_manager
213 .table(&catalog, &schema, &view, Some(&query_ctx))
214 .await
215 .context(CatalogSnafu)?
216 .context(ViewNotFoundSnafu { view_name: &view })?;
217
218 let view_id = table_ref.table_info().ident.table_id;
219
220 let view_info = self
221 .view_info_manager
222 .get(view_id)
223 .await
224 .context(FindViewInfoSnafu { view_name: &view })?
225 .context(ViewInfoNotFoundSnafu { view_name: &view })?;
226
227 query::sql::show_create_view(show.view_name, &view_info.definition, query_ctx)
228 .context(error::ExecuteStatementSnafu)
229 }
230
231 #[tracing::instrument(skip_all)]
232 pub(super) async fn show_views(
233 &self,
234 stmt: ShowViews,
235 query_ctx: QueryContextRef,
236 ) -> Result<Output> {
237 query::sql::show_views(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
238 .await
239 .context(ExecuteStatementSnafu)
240 }
241
242 #[tracing::instrument(skip_all)]
243 pub(super) async fn show_flows(
244 &self,
245 stmt: ShowFlows,
246 query_ctx: QueryContextRef,
247 ) -> Result<Output> {
248 query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
249 .await
250 .context(ExecuteStatementSnafu)
251 }
252
253 #[cfg(feature = "enterprise")]
254 #[tracing::instrument(skip_all)]
255 pub(super) async fn show_triggers(
256 &self,
257 stmt: sql::statements::show::trigger::ShowTriggers,
258 query_ctx: QueryContextRef,
259 ) -> Result<Output> {
260 query::sql::show_triggers(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
261 .await
262 .context(ExecuteStatementSnafu)
263 }
264
265 #[tracing::instrument(skip_all)]
266 pub async fn show_create_flow(
267 &self,
268 show: ShowCreateFlow,
269 query_ctx: QueryContextRef,
270 ) -> Result<Output> {
271 let obj_name = &show.flow_name;
272 let (catalog_name, flow_name) = match &obj_name.0[..] {
273 [flow] => (query_ctx.current_catalog().to_string(), flow.to_string_unquoted()),
274 [catalog, flow] => (catalog.to_string_unquoted(), flow.to_string_unquoted()),
275 _ => {
276 return InvalidSqlSnafu {
277 err_msg: format!(
278 "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {obj_name}",
279 ),
280 }
281 .fail()
282 }
283 };
284
285 let flow_name_val = self
286 .flow_metadata_manager
287 .flow_name_manager()
288 .get(&catalog_name, &flow_name)
289 .await
290 .context(error::TableMetadataManagerSnafu)?
291 .context(error::FlowNotFoundSnafu {
292 flow_name: &flow_name,
293 })?;
294
295 let flow_val = self
296 .flow_metadata_manager
297 .flow_info_manager()
298 .get(flow_name_val.flow_id())
299 .await
300 .context(error::TableMetadataManagerSnafu)?
301 .context(error::FlowNotFoundSnafu {
302 flow_name: &flow_name,
303 })?;
304
305 query::sql::show_create_flow(obj_name.clone(), flow_val, query_ctx)
306 .context(error::ExecuteStatementSnafu)
307 }
308
309 #[cfg(feature = "enterprise")]
310 #[tracing::instrument(skip_all)]
311 pub async fn show_create_trigger(
312 &self,
313 show: sql::statements::show::trigger::ShowCreateTrigger,
314 query_ctx: QueryContextRef,
315 ) -> Result<Output> {
316 let Some(trigger_querier) = self.trigger_querier.as_ref() else {
317 return error::MissingTriggerQuerierSnafu.fail();
318 };
319
320 let obj_name = &show.trigger_name;
321 let (catalog_name, trigger_name) = match &obj_name.0[..] {
322 [trigger] => (query_ctx.current_catalog().to_string(), trigger.to_string_unquoted()),
323 [catalog, trigger] => (catalog.to_string_unquoted(), trigger.to_string_unquoted()),
324 _ => {
325 return InvalidSqlSnafu {
326 err_msg: format!(
327 "expect trigger name to be <catalog>.<trigger_name> or <trigger_name>, actual: {obj_name}",
328 ),
329 }
330 .fail()
331 }
332 };
333 trigger_querier
334 .show_create_trigger(&catalog_name, &trigger_name, &query_ctx)
335 .await
336 .context(error::TriggerQuerierSnafu)
337 }
338
339 #[tracing::instrument(skip_all)]
340 pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
341 query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
342 }
343
344 #[tracing::instrument(skip_all)]
345 pub async fn show_collation(
346 &self,
347 kind: ShowKind,
348 query_ctx: QueryContextRef,
349 ) -> Result<Output> {
350 query::sql::show_collations(kind, &self.query_engine, &self.catalog_manager, query_ctx)
351 .await
352 .context(error::ExecuteStatementSnafu)
353 }
354
355 #[tracing::instrument(skip_all)]
356 pub async fn show_charset(&self, kind: ShowKind, query_ctx: QueryContextRef) -> Result<Output> {
357 query::sql::show_charsets(kind, &self.query_engine, &self.catalog_manager, query_ctx)
358 .await
359 .context(error::ExecuteStatementSnafu)
360 }
361
362 #[tracing::instrument(skip_all)]
363 pub async fn show_status(&self, query_ctx: QueryContextRef) -> Result<Output> {
364 query::sql::show_status(query_ctx)
365 .await
366 .context(error::ExecuteStatementSnafu)
367 }
368 pub async fn show_search_path(&self, query_ctx: QueryContextRef) -> Result<Output> {
369 query::sql::show_search_path(query_ctx)
370 .await
371 .context(error::ExecuteStatementSnafu)
372 }
373
374 pub async fn show_processlist(
375 &self,
376 stmt: ShowProcessList,
377 query_ctx: QueryContextRef,
378 ) -> Result<Output> {
379 query::sql::show_processlist(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
380 .await
381 .context(ExecLogicalPlanSnafu)
382 }
383}
384
385pub(crate) fn create_partitions_stmt(
386 table_info: &TableInfo,
387 partitions: &[PartitionInfoWithVersion],
388) -> Result<Option<Partitions>> {
389 if partitions.is_empty() {
390 return Ok(None);
391 }
392
393 let column_list = table_info
394 .meta
395 .partition_column_names()
396 .map(|name| name[..].into())
397 .collect();
398
399 let exprs = partitions
400 .iter()
401 .filter_map(|partition| {
402 partition
403 .partition_expr
404 .as_ref()
405 .map(|expr| expr.to_parser_expr())
406 })
407 .collect();
408
409 Ok(Some(Partitions { column_list, exprs }))
410}