1use common_error::ext::BoxedError;
16use common_meta::key::schema_name::SchemaNameKey;
17use common_query::Output;
18use common_telemetry::tracing;
19use partition::manager::PartitionInfo;
20use partition::partition::PartitionBound;
21use session::context::QueryContextRef;
22use session::table_name::table_idents_to_full_name;
23use snafu::{OptionExt, ResultExt};
24use sql::ast::Ident;
25use sql::statements::create::Partitions;
26use sql::statements::show::{
27 ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
28 ShowProcessList, ShowRegion, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
29};
30use sql::statements::OptionMap;
31use table::metadata::TableType;
32use table::table_name::TableName;
33use table::TableRef;
34
35use crate::error::{
36 self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu,
37 FindViewInfoSnafu, InvalidSqlSnafu, Result, TableMetadataManagerSnafu, ViewInfoNotFoundSnafu,
38 ViewNotFoundSnafu,
39};
40use crate::statement::StatementExecutor;
41
42impl StatementExecutor {
43 #[tracing::instrument(skip_all)]
44 pub(super) async fn show_databases(
45 &self,
46 stmt: ShowDatabases,
47 query_ctx: QueryContextRef,
48 ) -> Result<Output> {
49 query::sql::show_databases(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
50 .await
51 .context(ExecuteStatementSnafu)
52 }
53
54 #[tracing::instrument(skip_all)]
55 pub(super) async fn show_tables(
56 &self,
57 stmt: ShowTables,
58 query_ctx: QueryContextRef,
59 ) -> Result<Output> {
60 query::sql::show_tables(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
61 .await
62 .context(ExecuteStatementSnafu)
63 }
64
65 #[tracing::instrument(skip_all)]
66 pub(super) async fn show_table_status(
67 &self,
68 stmt: ShowTableStatus,
69 query_ctx: QueryContextRef,
70 ) -> Result<Output> {
71 query::sql::show_table_status(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
72 .await
73 .context(ExecuteStatementSnafu)
74 }
75
76 #[tracing::instrument(skip_all)]
77 pub(super) async fn show_columns(
78 &self,
79 stmt: ShowColumns,
80 query_ctx: QueryContextRef,
81 ) -> Result<Output> {
82 query::sql::show_columns(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
83 .await
84 .context(ExecuteStatementSnafu)
85 }
86
87 #[tracing::instrument(skip_all)]
88 pub(super) async fn show_index(
89 &self,
90 stmt: ShowIndex,
91 query_ctx: QueryContextRef,
92 ) -> Result<Output> {
93 query::sql::show_index(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
94 .await
95 .context(ExecuteStatementSnafu)
96 }
97
98 pub(super) async fn show_region(
99 &self,
100 stmt: ShowRegion,
101 query_ctx: QueryContextRef,
102 ) -> Result<Output> {
103 query::sql::show_region(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
104 .await
105 .context(ExecuteStatementSnafu)
106 }
107
108 #[tracing::instrument(skip_all)]
109 pub async fn show_create_database(
110 &self,
111 database_name: &str,
112 opts: OptionMap,
113 ) -> Result<Output> {
114 query::sql::show_create_database(database_name, opts).context(ExecuteStatementSnafu)
115 }
116
117 #[tracing::instrument(skip_all)]
118 pub async fn show_create_table(
119 &self,
120 table_name: TableName,
121 table: TableRef,
122 query_ctx: QueryContextRef,
123 ) -> Result<Output> {
124 let table_info = table.table_info();
125 if table_info.table_type != TableType::Base {
126 return error::ShowCreateTableBaseOnlySnafu {
127 table_name: table_name.to_string(),
128 table_type: table_info.table_type,
129 }
130 .fail();
131 }
132
133 let schema_options = self
134 .table_metadata_manager
135 .schema_manager()
136 .get(SchemaNameKey {
137 catalog: &table_name.catalog_name,
138 schema: &table_name.schema_name,
139 })
140 .await
141 .context(TableMetadataManagerSnafu)?
142 .map(|v| v.into_inner());
143
144 let partitions = self
145 .partition_manager
146 .find_table_partitions(table.table_info().table_id())
147 .await
148 .context(error::FindTablePartitionRuleSnafu {
149 table_name: &table_name.table_name,
150 })?;
151
152 let partitions = create_partitions_stmt(partitions)?;
153
154 query::sql::show_create_table(table, schema_options, partitions, query_ctx)
155 .context(ExecuteStatementSnafu)
156 }
157
158 #[tracing::instrument(skip_all)]
159 pub async fn show_create_table_for_pg(
160 &self,
161 table_name: TableName,
162 table: TableRef,
163 query_ctx: QueryContextRef,
164 ) -> Result<Output> {
165 let table_info = table.table_info();
166 if table_info.table_type != TableType::Base {
167 return error::ShowCreateTableBaseOnlySnafu {
168 table_name: table_name.to_string(),
169 table_type: table_info.table_type,
170 }
171 .fail();
172 }
173
174 query::sql::show_create_foreign_table_for_pg(table, query_ctx)
175 .context(ExecuteStatementSnafu)
176 }
177
178 #[tracing::instrument(skip_all)]
179 pub async fn show_create_view(
180 &self,
181 show: ShowCreateView,
182 query_ctx: QueryContextRef,
183 ) -> Result<Output> {
184 let (catalog, schema, view) = table_idents_to_full_name(&show.view_name, &query_ctx)
185 .map_err(BoxedError::new)
186 .context(ExternalSnafu)?;
187
188 let table_ref = self
189 .catalog_manager
190 .table(&catalog, &schema, &view, Some(&query_ctx))
191 .await
192 .context(CatalogSnafu)?
193 .context(ViewNotFoundSnafu { view_name: &view })?;
194
195 let view_id = table_ref.table_info().ident.table_id;
196
197 let view_info = self
198 .view_info_manager
199 .get(view_id)
200 .await
201 .context(FindViewInfoSnafu { view_name: &view })?
202 .context(ViewInfoNotFoundSnafu { view_name: &view })?;
203
204 query::sql::show_create_view(show.view_name, &view_info.definition, query_ctx)
205 .context(error::ExecuteStatementSnafu)
206 }
207
208 #[tracing::instrument(skip_all)]
209 pub(super) async fn show_views(
210 &self,
211 stmt: ShowViews,
212 query_ctx: QueryContextRef,
213 ) -> Result<Output> {
214 query::sql::show_views(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
215 .await
216 .context(ExecuteStatementSnafu)
217 }
218
219 #[tracing::instrument(skip_all)]
220 pub(super) async fn show_flows(
221 &self,
222 stmt: ShowFlows,
223 query_ctx: QueryContextRef,
224 ) -> Result<Output> {
225 query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
226 .await
227 .context(ExecuteStatementSnafu)
228 }
229
230 #[cfg(feature = "enterprise")]
231 #[tracing::instrument(skip_all)]
232 pub(super) async fn show_triggers(
233 &self,
234 _stmt: sql::statements::show::trigger::ShowTriggers,
235 _query_ctx: QueryContextRef,
236 ) -> Result<Output> {
237 crate::error::UnsupportedTriggerSnafu.fail()
238 }
239
240 #[tracing::instrument(skip_all)]
241 pub async fn show_create_flow(
242 &self,
243 show: ShowCreateFlow,
244 query_ctx: QueryContextRef,
245 ) -> Result<Output> {
246 let obj_name = &show.flow_name;
247 let (catalog_name, flow_name) = match &obj_name.0[..] {
248 [table] => (query_ctx.current_catalog().to_string(), table.value.clone()),
249 [catalog, table] => (catalog.value.clone(), table.value.clone()),
250 _ => {
251 return InvalidSqlSnafu {
252 err_msg: format!(
253 "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {obj_name}",
254 ),
255 }
256 .fail()
257 }
258 };
259
260 let flow_name_val = self
261 .flow_metadata_manager
262 .flow_name_manager()
263 .get(&catalog_name, &flow_name)
264 .await
265 .context(error::TableMetadataManagerSnafu)?
266 .context(error::FlowNotFoundSnafu {
267 flow_name: &flow_name,
268 })?;
269
270 let flow_val = self
271 .flow_metadata_manager
272 .flow_info_manager()
273 .get(flow_name_val.flow_id())
274 .await
275 .context(error::TableMetadataManagerSnafu)?
276 .context(error::FlowNotFoundSnafu {
277 flow_name: &flow_name,
278 })?;
279
280 query::sql::show_create_flow(obj_name.clone(), flow_val, query_ctx)
281 .context(error::ExecuteStatementSnafu)
282 }
283
284 #[tracing::instrument(skip_all)]
285 pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
286 query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
287 }
288
289 #[tracing::instrument(skip_all)]
290 pub async fn show_collation(
291 &self,
292 kind: ShowKind,
293 query_ctx: QueryContextRef,
294 ) -> Result<Output> {
295 query::sql::show_collations(kind, &self.query_engine, &self.catalog_manager, query_ctx)
296 .await
297 .context(error::ExecuteStatementSnafu)
298 }
299
300 #[tracing::instrument(skip_all)]
301 pub async fn show_charset(&self, kind: ShowKind, query_ctx: QueryContextRef) -> Result<Output> {
302 query::sql::show_charsets(kind, &self.query_engine, &self.catalog_manager, query_ctx)
303 .await
304 .context(error::ExecuteStatementSnafu)
305 }
306
307 #[tracing::instrument(skip_all)]
308 pub async fn show_status(&self, query_ctx: QueryContextRef) -> Result<Output> {
309 query::sql::show_status(query_ctx)
310 .await
311 .context(error::ExecuteStatementSnafu)
312 }
313 pub async fn show_search_path(&self, query_ctx: QueryContextRef) -> Result<Output> {
314 query::sql::show_search_path(query_ctx)
315 .await
316 .context(error::ExecuteStatementSnafu)
317 }
318
319 pub async fn show_processlist(
320 &self,
321 stmt: ShowProcessList,
322 query_ctx: QueryContextRef,
323 ) -> Result<Output> {
324 query::sql::show_processlist(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
325 .await
326 .context(ExecLogicalPlanSnafu)
327 }
328}
329
330pub(crate) fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Partitions>> {
331 if partitions.is_empty() {
332 return Ok(None);
333 }
334
335 let column_list: Vec<Ident> = partitions[0]
336 .partition
337 .partition_columns()
338 .iter()
339 .map(|name| name[..].into())
340 .collect();
341
342 let exprs = partitions
343 .iter()
344 .filter_map(|partition| {
345 partition
346 .partition
347 .partition_bounds()
348 .first()
349 .and_then(|bound| {
350 if let PartitionBound::Expr(expr) = bound {
351 Some(expr.to_parser_expr())
352 } else {
353 None
354 }
355 })
356 })
357 .collect();
358
359 Ok(Some(Partitions { column_list, exprs }))
360}