1use common_error::ext::BoxedError;
16use common_meta::key::schema_name::SchemaNameKey;
17use common_query::Output;
18use common_telemetry::tracing;
19use partition::manager::PartitionInfo;
20use partition::partition::PartitionBound;
21use session::context::QueryContextRef;
22use session::table_name::table_idents_to_full_name;
23use snafu::{OptionExt, ResultExt};
24use sql::ast::Ident;
25use sql::statements::create::Partitions;
26use sql::statements::show::{
27 ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
28 ShowProcessList, ShowRegion, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
29};
30use sql::statements::OptionMap;
31use table::metadata::TableType;
32use table::table_name::TableName;
33use table::TableRef;
34
35use crate::error::{
36 self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu,
37 FindViewInfoSnafu, InvalidSqlSnafu, Result, TableMetadataManagerSnafu, ViewInfoNotFoundSnafu,
38 ViewNotFoundSnafu,
39};
40use crate::statement::StatementExecutor;
41
42impl StatementExecutor {
43 #[tracing::instrument(skip_all)]
44 pub(super) async fn show_databases(
45 &self,
46 stmt: ShowDatabases,
47 query_ctx: QueryContextRef,
48 ) -> Result<Output> {
49 query::sql::show_databases(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
50 .await
51 .context(ExecuteStatementSnafu)
52 }
53
54 #[tracing::instrument(skip_all)]
55 pub(super) async fn show_tables(
56 &self,
57 stmt: ShowTables,
58 query_ctx: QueryContextRef,
59 ) -> Result<Output> {
60 query::sql::show_tables(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
61 .await
62 .context(ExecuteStatementSnafu)
63 }
64
65 #[tracing::instrument(skip_all)]
66 pub(super) async fn show_table_status(
67 &self,
68 stmt: ShowTableStatus,
69 query_ctx: QueryContextRef,
70 ) -> Result<Output> {
71 query::sql::show_table_status(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
72 .await
73 .context(ExecuteStatementSnafu)
74 }
75
76 #[tracing::instrument(skip_all)]
77 pub(super) async fn show_columns(
78 &self,
79 stmt: ShowColumns,
80 query_ctx: QueryContextRef,
81 ) -> Result<Output> {
82 query::sql::show_columns(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
83 .await
84 .context(ExecuteStatementSnafu)
85 }
86
87 #[tracing::instrument(skip_all)]
88 pub(super) async fn show_index(
89 &self,
90 stmt: ShowIndex,
91 query_ctx: QueryContextRef,
92 ) -> Result<Output> {
93 query::sql::show_index(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
94 .await
95 .context(ExecuteStatementSnafu)
96 }
97
98 pub(super) async fn show_region(
99 &self,
100 stmt: ShowRegion,
101 query_ctx: QueryContextRef,
102 ) -> Result<Output> {
103 query::sql::show_region(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
104 .await
105 .context(ExecuteStatementSnafu)
106 }
107
108 #[tracing::instrument(skip_all)]
109 pub async fn show_create_database(
110 &self,
111 database_name: &str,
112 opts: OptionMap,
113 ) -> Result<Output> {
114 query::sql::show_create_database(database_name, opts).context(ExecuteStatementSnafu)
115 }
116
117 #[tracing::instrument(skip_all)]
118 pub async fn show_create_table(
119 &self,
120 table_name: TableName,
121 table: TableRef,
122 query_ctx: QueryContextRef,
123 ) -> Result<Output> {
124 let table_info = table.table_info();
125 if table_info.table_type != TableType::Base {
126 return error::ShowCreateTableBaseOnlySnafu {
127 table_name: table_name.to_string(),
128 table_type: table_info.table_type,
129 }
130 .fail();
131 }
132
133 let schema_options = self
134 .table_metadata_manager
135 .schema_manager()
136 .get(SchemaNameKey {
137 catalog: &table_name.catalog_name,
138 schema: &table_name.schema_name,
139 })
140 .await
141 .context(TableMetadataManagerSnafu)?
142 .map(|v| v.into_inner());
143
144 let partitions = self
145 .partition_manager
146 .find_table_partitions(table.table_info().table_id())
147 .await
148 .context(error::FindTablePartitionRuleSnafu {
149 table_name: &table_name.table_name,
150 })?;
151
152 let partitions = create_partitions_stmt(partitions)?;
153
154 query::sql::show_create_table(table, schema_options, partitions, query_ctx)
155 .context(ExecuteStatementSnafu)
156 }
157
158 #[tracing::instrument(skip_all)]
159 pub async fn show_create_table_for_pg(
160 &self,
161 table_name: TableName,
162 table: TableRef,
163 query_ctx: QueryContextRef,
164 ) -> Result<Output> {
165 let table_info = table.table_info();
166 if table_info.table_type != TableType::Base {
167 return error::ShowCreateTableBaseOnlySnafu {
168 table_name: table_name.to_string(),
169 table_type: table_info.table_type,
170 }
171 .fail();
172 }
173
174 query::sql::show_create_foreign_table_for_pg(table, query_ctx)
175 .context(ExecuteStatementSnafu)
176 }
177
178 #[tracing::instrument(skip_all)]
179 pub async fn show_create_view(
180 &self,
181 show: ShowCreateView,
182 query_ctx: QueryContextRef,
183 ) -> Result<Output> {
184 let (catalog, schema, view) = table_idents_to_full_name(&show.view_name, &query_ctx)
185 .map_err(BoxedError::new)
186 .context(ExternalSnafu)?;
187
188 let table_ref = self
189 .catalog_manager
190 .table(&catalog, &schema, &view, Some(&query_ctx))
191 .await
192 .context(CatalogSnafu)?
193 .context(ViewNotFoundSnafu { view_name: &view })?;
194
195 let view_id = table_ref.table_info().ident.table_id;
196
197 let view_info = self
198 .view_info_manager
199 .get(view_id)
200 .await
201 .context(FindViewInfoSnafu { view_name: &view })?
202 .context(ViewInfoNotFoundSnafu { view_name: &view })?;
203
204 query::sql::show_create_view(show.view_name, &view_info.definition, query_ctx)
205 .context(error::ExecuteStatementSnafu)
206 }
207
208 #[tracing::instrument(skip_all)]
209 pub(super) async fn show_views(
210 &self,
211 stmt: ShowViews,
212 query_ctx: QueryContextRef,
213 ) -> Result<Output> {
214 query::sql::show_views(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
215 .await
216 .context(ExecuteStatementSnafu)
217 }
218
219 #[tracing::instrument(skip_all)]
220 pub(super) async fn show_flows(
221 &self,
222 stmt: ShowFlows,
223 query_ctx: QueryContextRef,
224 ) -> Result<Output> {
225 query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
226 .await
227 .context(ExecuteStatementSnafu)
228 }
229
230 #[cfg(feature = "enterprise")]
231 #[tracing::instrument(skip_all)]
232 pub(super) async fn show_triggers(
233 &self,
234 stmt: sql::statements::show::trigger::ShowTriggers,
235 query_ctx: QueryContextRef,
236 ) -> Result<Output> {
237 query::sql::show_triggers(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
238 .await
239 .context(ExecuteStatementSnafu)
240 }
241
242 #[tracing::instrument(skip_all)]
243 pub async fn show_create_flow(
244 &self,
245 show: ShowCreateFlow,
246 query_ctx: QueryContextRef,
247 ) -> Result<Output> {
248 let obj_name = &show.flow_name;
249 let (catalog_name, flow_name) = match &obj_name.0[..] {
250 [table] => (query_ctx.current_catalog().to_string(), table.value.clone()),
251 [catalog, table] => (catalog.value.clone(), table.value.clone()),
252 _ => {
253 return InvalidSqlSnafu {
254 err_msg: format!(
255 "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {obj_name}",
256 ),
257 }
258 .fail()
259 }
260 };
261
262 let flow_name_val = self
263 .flow_metadata_manager
264 .flow_name_manager()
265 .get(&catalog_name, &flow_name)
266 .await
267 .context(error::TableMetadataManagerSnafu)?
268 .context(error::FlowNotFoundSnafu {
269 flow_name: &flow_name,
270 })?;
271
272 let flow_val = self
273 .flow_metadata_manager
274 .flow_info_manager()
275 .get(flow_name_val.flow_id())
276 .await
277 .context(error::TableMetadataManagerSnafu)?
278 .context(error::FlowNotFoundSnafu {
279 flow_name: &flow_name,
280 })?;
281
282 query::sql::show_create_flow(obj_name.clone(), flow_val, query_ctx)
283 .context(error::ExecuteStatementSnafu)
284 }
285
286 #[tracing::instrument(skip_all)]
287 pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
288 query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
289 }
290
291 #[tracing::instrument(skip_all)]
292 pub async fn show_collation(
293 &self,
294 kind: ShowKind,
295 query_ctx: QueryContextRef,
296 ) -> Result<Output> {
297 query::sql::show_collations(kind, &self.query_engine, &self.catalog_manager, query_ctx)
298 .await
299 .context(error::ExecuteStatementSnafu)
300 }
301
302 #[tracing::instrument(skip_all)]
303 pub async fn show_charset(&self, kind: ShowKind, query_ctx: QueryContextRef) -> Result<Output> {
304 query::sql::show_charsets(kind, &self.query_engine, &self.catalog_manager, query_ctx)
305 .await
306 .context(error::ExecuteStatementSnafu)
307 }
308
309 #[tracing::instrument(skip_all)]
310 pub async fn show_status(&self, query_ctx: QueryContextRef) -> Result<Output> {
311 query::sql::show_status(query_ctx)
312 .await
313 .context(error::ExecuteStatementSnafu)
314 }
315 pub async fn show_search_path(&self, query_ctx: QueryContextRef) -> Result<Output> {
316 query::sql::show_search_path(query_ctx)
317 .await
318 .context(error::ExecuteStatementSnafu)
319 }
320
321 pub async fn show_processlist(
322 &self,
323 stmt: ShowProcessList,
324 query_ctx: QueryContextRef,
325 ) -> Result<Output> {
326 query::sql::show_processlist(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
327 .await
328 .context(ExecLogicalPlanSnafu)
329 }
330}
331
332pub(crate) fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Partitions>> {
333 if partitions.is_empty() {
334 return Ok(None);
335 }
336
337 let column_list: Vec<Ident> = partitions[0]
338 .partition
339 .partition_columns()
340 .iter()
341 .map(|name| name[..].into())
342 .collect();
343
344 let exprs = partitions
345 .iter()
346 .filter_map(|partition| {
347 partition
348 .partition
349 .partition_bounds()
350 .first()
351 .and_then(|bound| {
352 if let PartitionBound::Expr(expr) = bound {
353 Some(expr.to_parser_expr())
354 } else {
355 None
356 }
357 })
358 })
359 .collect();
360
361 Ok(Some(Partitions { column_list, exprs }))
362}