1use common_error::ext::BoxedError;
16use common_meta::key::schema_name::SchemaNameKey;
17use common_query::Output;
18use common_telemetry::tracing;
19use partition::manager::PartitionInfo;
20use session::context::QueryContextRef;
21use session::table_name::table_idents_to_full_name;
22use snafu::{OptionExt, ResultExt};
23use sql::ast::ObjectNamePartExt;
24use sql::statements::create::Partitions;
25use sql::statements::show::{
26 ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
27 ShowProcessList, ShowRegion, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
28};
29use sql::statements::OptionMap;
30use table::metadata::{TableInfo, TableType};
31use table::table_name::TableName;
32use table::TableRef;
33
34use crate::error::{
35 self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu,
36 FindViewInfoSnafu, InvalidSqlSnafu, Result, TableMetadataManagerSnafu, ViewInfoNotFoundSnafu,
37 ViewNotFoundSnafu,
38};
39use crate::statement::StatementExecutor;
40
41impl StatementExecutor {
42 #[tracing::instrument(skip_all)]
43 pub(super) async fn show_databases(
44 &self,
45 stmt: ShowDatabases,
46 query_ctx: QueryContextRef,
47 ) -> Result<Output> {
48 query::sql::show_databases(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
49 .await
50 .context(ExecuteStatementSnafu)
51 }
52
53 #[tracing::instrument(skip_all)]
54 pub(super) async fn show_tables(
55 &self,
56 stmt: ShowTables,
57 query_ctx: QueryContextRef,
58 ) -> Result<Output> {
59 query::sql::show_tables(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
60 .await
61 .context(ExecuteStatementSnafu)
62 }
63
64 #[tracing::instrument(skip_all)]
65 pub(super) async fn show_table_status(
66 &self,
67 stmt: ShowTableStatus,
68 query_ctx: QueryContextRef,
69 ) -> Result<Output> {
70 query::sql::show_table_status(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
71 .await
72 .context(ExecuteStatementSnafu)
73 }
74
75 #[tracing::instrument(skip_all)]
76 pub(super) async fn show_columns(
77 &self,
78 stmt: ShowColumns,
79 query_ctx: QueryContextRef,
80 ) -> Result<Output> {
81 query::sql::show_columns(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
82 .await
83 .context(ExecuteStatementSnafu)
84 }
85
86 #[tracing::instrument(skip_all)]
87 pub(super) async fn show_index(
88 &self,
89 stmt: ShowIndex,
90 query_ctx: QueryContextRef,
91 ) -> Result<Output> {
92 query::sql::show_index(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
93 .await
94 .context(ExecuteStatementSnafu)
95 }
96
97 pub(super) async fn show_region(
98 &self,
99 stmt: ShowRegion,
100 query_ctx: QueryContextRef,
101 ) -> Result<Output> {
102 query::sql::show_region(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
103 .await
104 .context(ExecuteStatementSnafu)
105 }
106
107 #[tracing::instrument(skip_all)]
108 pub async fn show_create_database(
109 &self,
110 database_name: &str,
111 opts: OptionMap,
112 ) -> Result<Output> {
113 query::sql::show_create_database(database_name, opts).context(ExecuteStatementSnafu)
114 }
115
116 #[tracing::instrument(skip_all)]
117 pub async fn show_create_table(
118 &self,
119 table_name: TableName,
120 table: TableRef,
121 query_ctx: QueryContextRef,
122 ) -> Result<Output> {
123 let table_info = table.table_info();
124 if table_info.table_type != TableType::Base {
125 return error::ShowCreateTableBaseOnlySnafu {
126 table_name: table_name.to_string(),
127 table_type: table_info.table_type,
128 }
129 .fail();
130 }
131
132 let schema_options = self
133 .table_metadata_manager
134 .schema_manager()
135 .get(SchemaNameKey {
136 catalog: &table_name.catalog_name,
137 schema: &table_name.schema_name,
138 })
139 .await
140 .context(TableMetadataManagerSnafu)?
141 .map(|v| v.into_inner());
142
143 let partitions = self
144 .partition_manager
145 .find_table_partitions(table_info.table_id())
146 .await
147 .context(error::FindTablePartitionRuleSnafu {
148 table_name: &table_name.table_name,
149 })?;
150
151 let partitions = create_partitions_stmt(&table_info, partitions)?;
152
153 query::sql::show_create_table(table, schema_options, partitions, query_ctx)
154 .context(ExecuteStatementSnafu)
155 }
156
157 #[tracing::instrument(skip_all)]
158 pub async fn show_create_table_for_pg(
159 &self,
160 table_name: TableName,
161 table: TableRef,
162 query_ctx: QueryContextRef,
163 ) -> Result<Output> {
164 let table_info = table.table_info();
165 if table_info.table_type != TableType::Base {
166 return error::ShowCreateTableBaseOnlySnafu {
167 table_name: table_name.to_string(),
168 table_type: table_info.table_type,
169 }
170 .fail();
171 }
172
173 query::sql::show_create_foreign_table_for_pg(table, query_ctx)
174 .context(ExecuteStatementSnafu)
175 }
176
177 #[tracing::instrument(skip_all)]
178 pub async fn show_create_view(
179 &self,
180 show: ShowCreateView,
181 query_ctx: QueryContextRef,
182 ) -> Result<Output> {
183 let (catalog, schema, view) = table_idents_to_full_name(&show.view_name, &query_ctx)
184 .map_err(BoxedError::new)
185 .context(ExternalSnafu)?;
186
187 let table_ref = self
188 .catalog_manager
189 .table(&catalog, &schema, &view, Some(&query_ctx))
190 .await
191 .context(CatalogSnafu)?
192 .context(ViewNotFoundSnafu { view_name: &view })?;
193
194 let view_id = table_ref.table_info().ident.table_id;
195
196 let view_info = self
197 .view_info_manager
198 .get(view_id)
199 .await
200 .context(FindViewInfoSnafu { view_name: &view })?
201 .context(ViewInfoNotFoundSnafu { view_name: &view })?;
202
203 query::sql::show_create_view(show.view_name, &view_info.definition, query_ctx)
204 .context(error::ExecuteStatementSnafu)
205 }
206
207 #[tracing::instrument(skip_all)]
208 pub(super) async fn show_views(
209 &self,
210 stmt: ShowViews,
211 query_ctx: QueryContextRef,
212 ) -> Result<Output> {
213 query::sql::show_views(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
214 .await
215 .context(ExecuteStatementSnafu)
216 }
217
218 #[tracing::instrument(skip_all)]
219 pub(super) async fn show_flows(
220 &self,
221 stmt: ShowFlows,
222 query_ctx: QueryContextRef,
223 ) -> Result<Output> {
224 query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
225 .await
226 .context(ExecuteStatementSnafu)
227 }
228
229 #[cfg(feature = "enterprise")]
230 #[tracing::instrument(skip_all)]
231 pub(super) async fn show_triggers(
232 &self,
233 stmt: sql::statements::show::trigger::ShowTriggers,
234 query_ctx: QueryContextRef,
235 ) -> Result<Output> {
236 query::sql::show_triggers(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
237 .await
238 .context(ExecuteStatementSnafu)
239 }
240
241 #[tracing::instrument(skip_all)]
242 pub async fn show_create_flow(
243 &self,
244 show: ShowCreateFlow,
245 query_ctx: QueryContextRef,
246 ) -> Result<Output> {
247 let obj_name = &show.flow_name;
248 let (catalog_name, flow_name) = match &obj_name.0[..] {
249 [table] => (query_ctx.current_catalog().to_string(), table.to_string_unquoted()),
250 [catalog, table] => (catalog.to_string_unquoted(), table.to_string_unquoted()),
251 _ => {
252 return InvalidSqlSnafu {
253 err_msg: format!(
254 "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {obj_name}",
255 ),
256 }
257 .fail()
258 }
259 };
260
261 let flow_name_val = self
262 .flow_metadata_manager
263 .flow_name_manager()
264 .get(&catalog_name, &flow_name)
265 .await
266 .context(error::TableMetadataManagerSnafu)?
267 .context(error::FlowNotFoundSnafu {
268 flow_name: &flow_name,
269 })?;
270
271 let flow_val = self
272 .flow_metadata_manager
273 .flow_info_manager()
274 .get(flow_name_val.flow_id())
275 .await
276 .context(error::TableMetadataManagerSnafu)?
277 .context(error::FlowNotFoundSnafu {
278 flow_name: &flow_name,
279 })?;
280
281 query::sql::show_create_flow(obj_name.clone(), flow_val, query_ctx)
282 .context(error::ExecuteStatementSnafu)
283 }
284
285 #[tracing::instrument(skip_all)]
286 pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
287 query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
288 }
289
290 #[tracing::instrument(skip_all)]
291 pub async fn show_collation(
292 &self,
293 kind: ShowKind,
294 query_ctx: QueryContextRef,
295 ) -> Result<Output> {
296 query::sql::show_collations(kind, &self.query_engine, &self.catalog_manager, query_ctx)
297 .await
298 .context(error::ExecuteStatementSnafu)
299 }
300
301 #[tracing::instrument(skip_all)]
302 pub async fn show_charset(&self, kind: ShowKind, query_ctx: QueryContextRef) -> Result<Output> {
303 query::sql::show_charsets(kind, &self.query_engine, &self.catalog_manager, query_ctx)
304 .await
305 .context(error::ExecuteStatementSnafu)
306 }
307
308 #[tracing::instrument(skip_all)]
309 pub async fn show_status(&self, query_ctx: QueryContextRef) -> Result<Output> {
310 query::sql::show_status(query_ctx)
311 .await
312 .context(error::ExecuteStatementSnafu)
313 }
314 pub async fn show_search_path(&self, query_ctx: QueryContextRef) -> Result<Output> {
315 query::sql::show_search_path(query_ctx)
316 .await
317 .context(error::ExecuteStatementSnafu)
318 }
319
320 pub async fn show_processlist(
321 &self,
322 stmt: ShowProcessList,
323 query_ctx: QueryContextRef,
324 ) -> Result<Output> {
325 query::sql::show_processlist(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
326 .await
327 .context(ExecLogicalPlanSnafu)
328 }
329}
330
331pub(crate) fn create_partitions_stmt(
332 table_info: &TableInfo,
333 partitions: Vec<PartitionInfo>,
334) -> Result<Option<Partitions>> {
335 if partitions.is_empty() {
336 return Ok(None);
337 }
338
339 let column_list = table_info
340 .meta
341 .partition_column_names()
342 .map(|name| name[..].into())
343 .collect();
344
345 let exprs = partitions
346 .iter()
347 .filter_map(|partition| {
348 partition
349 .partition_expr
350 .as_ref()
351 .map(|expr| expr.to_parser_expr())
352 })
353 .collect();
354
355 Ok(Some(Partitions { column_list, exprs }))
356}