1use std::sync::Arc;
16
17use common_error::ext::BoxedError;
18use common_meta::key::schema_name::SchemaNameKey;
19use common_query::Output;
20use common_telemetry::tracing;
21use partition::manager::PartitionInfo;
22use session::context::QueryContextRef;
23use session::table_name::table_idents_to_full_name;
24use snafu::{OptionExt, ResultExt};
25use sql::ast::ObjectNamePartExt;
26use sql::statements::OptionMap;
27use sql::statements::create::Partitions;
28use sql::statements::show::{
29 ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
30 ShowProcessList, ShowRegion, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
31};
32use table::TableRef;
33use table::metadata::{TableInfo, TableType};
34use table::table_name::TableName;
35
36use crate::error::{
37 self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu,
38 FindViewInfoSnafu, InvalidSqlSnafu, Result, TableMetadataManagerSnafu, ViewInfoNotFoundSnafu,
39 ViewNotFoundSnafu,
40};
41use crate::statement::StatementExecutor;
42
43impl StatementExecutor {
44 #[tracing::instrument(skip_all)]
45 pub(super) async fn show_databases(
46 &self,
47 stmt: ShowDatabases,
48 query_ctx: QueryContextRef,
49 ) -> Result<Output> {
50 query::sql::show_databases(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
51 .await
52 .context(ExecuteStatementSnafu)
53 }
54
55 #[tracing::instrument(skip_all)]
56 pub(super) async fn show_tables(
57 &self,
58 stmt: ShowTables,
59 query_ctx: QueryContextRef,
60 ) -> Result<Output> {
61 query::sql::show_tables(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
62 .await
63 .context(ExecuteStatementSnafu)
64 }
65
66 #[tracing::instrument(skip_all)]
67 pub(super) async fn show_table_status(
68 &self,
69 stmt: ShowTableStatus,
70 query_ctx: QueryContextRef,
71 ) -> Result<Output> {
72 query::sql::show_table_status(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
73 .await
74 .context(ExecuteStatementSnafu)
75 }
76
77 #[tracing::instrument(skip_all)]
78 pub(super) async fn show_columns(
79 &self,
80 stmt: ShowColumns,
81 query_ctx: QueryContextRef,
82 ) -> Result<Output> {
83 query::sql::show_columns(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
84 .await
85 .context(ExecuteStatementSnafu)
86 }
87
88 #[tracing::instrument(skip_all)]
89 pub(super) async fn show_index(
90 &self,
91 stmt: ShowIndex,
92 query_ctx: QueryContextRef,
93 ) -> Result<Output> {
94 query::sql::show_index(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
95 .await
96 .context(ExecuteStatementSnafu)
97 }
98
99 pub(super) async fn show_region(
100 &self,
101 stmt: ShowRegion,
102 query_ctx: QueryContextRef,
103 ) -> Result<Output> {
104 query::sql::show_region(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
105 .await
106 .context(ExecuteStatementSnafu)
107 }
108
109 #[tracing::instrument(skip_all)]
110 pub async fn show_create_database(
111 &self,
112 database_name: &str,
113 opts: OptionMap,
114 ) -> Result<Output> {
115 query::sql::show_create_database(database_name, opts).context(ExecuteStatementSnafu)
116 }
117
118 #[tracing::instrument(skip_all)]
119 pub async fn show_create_table(
120 &self,
121 table_name: TableName,
122 table: TableRef,
123 query_ctx: QueryContextRef,
124 ) -> Result<Output> {
125 let mut table_info = table.table_info();
126 let partition_column_names: Vec<_> =
127 table_info.meta.partition_column_names().cloned().collect();
128
129 if let Some(latest) = self
130 .table_metadata_manager
131 .table_info_manager()
132 .get(table_info.table_id())
133 .await
134 .context(TableMetadataManagerSnafu)?
135 {
136 let mut latest_info = TableInfo::try_from(latest.into_inner().table_info)
137 .context(error::CreateTableInfoSnafu)?;
138
139 if !partition_column_names.is_empty() {
140 latest_info.meta.partition_key_indices = partition_column_names
141 .iter()
142 .filter_map(|name| latest_info.meta.schema.column_index_by_name(name.as_str()))
143 .collect();
144 }
145
146 table_info = Arc::new(latest_info);
147 }
148
149 if table_info.table_type != TableType::Base {
150 return error::ShowCreateTableBaseOnlySnafu {
151 table_name: table_name.to_string(),
152 table_type: table_info.table_type,
153 }
154 .fail();
155 }
156
157 let schema_options = self
158 .table_metadata_manager
159 .schema_manager()
160 .get(SchemaNameKey {
161 catalog: &table_name.catalog_name,
162 schema: &table_name.schema_name,
163 })
164 .await
165 .context(TableMetadataManagerSnafu)?
166 .map(|v| v.into_inner());
167
168 let partitions = self
169 .partition_manager
170 .find_table_partitions(table_info.table_id())
171 .await
172 .context(error::FindTablePartitionRuleSnafu {
173 table_name: &table_name.table_name,
174 })?;
175
176 let partitions = create_partitions_stmt(&table_info, partitions)?;
177
178 query::sql::show_create_table(table_info, schema_options, partitions, query_ctx)
179 .context(ExecuteStatementSnafu)
180 }
181
182 #[tracing::instrument(skip_all)]
183 pub async fn show_create_table_for_pg(
184 &self,
185 table_name: TableName,
186 table: TableRef,
187 query_ctx: QueryContextRef,
188 ) -> Result<Output> {
189 let table_info = table.table_info();
190 if table_info.table_type != TableType::Base {
191 return error::ShowCreateTableBaseOnlySnafu {
192 table_name: table_name.to_string(),
193 table_type: table_info.table_type,
194 }
195 .fail();
196 }
197
198 query::sql::show_create_foreign_table_for_pg(table, query_ctx)
199 .context(ExecuteStatementSnafu)
200 }
201
202 #[tracing::instrument(skip_all)]
203 pub async fn show_create_view(
204 &self,
205 show: ShowCreateView,
206 query_ctx: QueryContextRef,
207 ) -> Result<Output> {
208 let (catalog, schema, view) = table_idents_to_full_name(&show.view_name, &query_ctx)
209 .map_err(BoxedError::new)
210 .context(ExternalSnafu)?;
211
212 let table_ref = self
213 .catalog_manager
214 .table(&catalog, &schema, &view, Some(&query_ctx))
215 .await
216 .context(CatalogSnafu)?
217 .context(ViewNotFoundSnafu { view_name: &view })?;
218
219 let view_id = table_ref.table_info().ident.table_id;
220
221 let view_info = self
222 .view_info_manager
223 .get(view_id)
224 .await
225 .context(FindViewInfoSnafu { view_name: &view })?
226 .context(ViewInfoNotFoundSnafu { view_name: &view })?;
227
228 query::sql::show_create_view(show.view_name, &view_info.definition, query_ctx)
229 .context(error::ExecuteStatementSnafu)
230 }
231
232 #[tracing::instrument(skip_all)]
233 pub(super) async fn show_views(
234 &self,
235 stmt: ShowViews,
236 query_ctx: QueryContextRef,
237 ) -> Result<Output> {
238 query::sql::show_views(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
239 .await
240 .context(ExecuteStatementSnafu)
241 }
242
243 #[tracing::instrument(skip_all)]
244 pub(super) async fn show_flows(
245 &self,
246 stmt: ShowFlows,
247 query_ctx: QueryContextRef,
248 ) -> Result<Output> {
249 query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
250 .await
251 .context(ExecuteStatementSnafu)
252 }
253
254 #[cfg(feature = "enterprise")]
255 #[tracing::instrument(skip_all)]
256 pub(super) async fn show_triggers(
257 &self,
258 stmt: sql::statements::show::trigger::ShowTriggers,
259 query_ctx: QueryContextRef,
260 ) -> Result<Output> {
261 query::sql::show_triggers(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
262 .await
263 .context(ExecuteStatementSnafu)
264 }
265
266 #[tracing::instrument(skip_all)]
267 pub async fn show_create_flow(
268 &self,
269 show: ShowCreateFlow,
270 query_ctx: QueryContextRef,
271 ) -> Result<Output> {
272 let obj_name = &show.flow_name;
273 let (catalog_name, flow_name) = match &obj_name.0[..] {
274 [flow] => (query_ctx.current_catalog().to_string(), flow.to_string_unquoted()),
275 [catalog, flow] => (catalog.to_string_unquoted(), flow.to_string_unquoted()),
276 _ => {
277 return InvalidSqlSnafu {
278 err_msg: format!(
279 "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {obj_name}",
280 ),
281 }
282 .fail()
283 }
284 };
285
286 let flow_name_val = self
287 .flow_metadata_manager
288 .flow_name_manager()
289 .get(&catalog_name, &flow_name)
290 .await
291 .context(error::TableMetadataManagerSnafu)?
292 .context(error::FlowNotFoundSnafu {
293 flow_name: &flow_name,
294 })?;
295
296 let flow_val = self
297 .flow_metadata_manager
298 .flow_info_manager()
299 .get(flow_name_val.flow_id())
300 .await
301 .context(error::TableMetadataManagerSnafu)?
302 .context(error::FlowNotFoundSnafu {
303 flow_name: &flow_name,
304 })?;
305
306 query::sql::show_create_flow(obj_name.clone(), flow_val, query_ctx)
307 .context(error::ExecuteStatementSnafu)
308 }
309
310 #[cfg(feature = "enterprise")]
311 #[tracing::instrument(skip_all)]
312 pub async fn show_create_trigger(
313 &self,
314 show: sql::statements::show::trigger::ShowCreateTrigger,
315 query_ctx: QueryContextRef,
316 ) -> Result<Output> {
317 let Some(trigger_querier) = self.trigger_querier.as_ref() else {
318 return error::MissingTriggerQuerierSnafu.fail();
319 };
320
321 let obj_name = &show.trigger_name;
322 let (catalog_name, trigger_name) = match &obj_name.0[..] {
323 [trigger] => (query_ctx.current_catalog().to_string(), trigger.to_string_unquoted()),
324 [catalog, trigger] => (catalog.to_string_unquoted(), trigger.to_string_unquoted()),
325 _ => {
326 return InvalidSqlSnafu {
327 err_msg: format!(
328 "expect trigger name to be <catalog>.<trigger_name> or <trigger_name>, actual: {obj_name}",
329 ),
330 }
331 .fail()
332 }
333 };
334 trigger_querier
335 .show_create_trigger(&catalog_name, &trigger_name, &query_ctx)
336 .await
337 .context(error::TriggerQuerierSnafu)
338 }
339
340 #[tracing::instrument(skip_all)]
341 pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
342 query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
343 }
344
345 #[tracing::instrument(skip_all)]
346 pub async fn show_collation(
347 &self,
348 kind: ShowKind,
349 query_ctx: QueryContextRef,
350 ) -> Result<Output> {
351 query::sql::show_collations(kind, &self.query_engine, &self.catalog_manager, query_ctx)
352 .await
353 .context(error::ExecuteStatementSnafu)
354 }
355
356 #[tracing::instrument(skip_all)]
357 pub async fn show_charset(&self, kind: ShowKind, query_ctx: QueryContextRef) -> Result<Output> {
358 query::sql::show_charsets(kind, &self.query_engine, &self.catalog_manager, query_ctx)
359 .await
360 .context(error::ExecuteStatementSnafu)
361 }
362
363 #[tracing::instrument(skip_all)]
364 pub async fn show_status(&self, query_ctx: QueryContextRef) -> Result<Output> {
365 query::sql::show_status(query_ctx)
366 .await
367 .context(error::ExecuteStatementSnafu)
368 }
369 pub async fn show_search_path(&self, query_ctx: QueryContextRef) -> Result<Output> {
370 query::sql::show_search_path(query_ctx)
371 .await
372 .context(error::ExecuteStatementSnafu)
373 }
374
375 pub async fn show_processlist(
376 &self,
377 stmt: ShowProcessList,
378 query_ctx: QueryContextRef,
379 ) -> Result<Output> {
380 query::sql::show_processlist(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
381 .await
382 .context(ExecLogicalPlanSnafu)
383 }
384}
385
386pub(crate) fn create_partitions_stmt(
387 table_info: &TableInfo,
388 partitions: Vec<PartitionInfo>,
389) -> Result<Option<Partitions>> {
390 if partitions.is_empty() {
391 return Ok(None);
392 }
393
394 let column_list = table_info
395 .meta
396 .partition_column_names()
397 .map(|name| name[..].into())
398 .collect();
399
400 let exprs = partitions
401 .iter()
402 .filter_map(|partition| {
403 partition
404 .partition_expr
405 .as_ref()
406 .map(|expr| expr.to_parser_expr())
407 })
408 .collect();
409
410 Ok(Some(Partitions { column_list, exprs }))
411}