1use common_error::ext::BoxedError;
16use common_meta::key::schema_name::SchemaNameKey;
17use common_query::Output;
18use common_telemetry::tracing;
19use partition::manager::PartitionInfo;
20use partition::partition::PartitionBound;
21use session::context::QueryContextRef;
22use session::table_name::table_idents_to_full_name;
23use snafu::{OptionExt, ResultExt};
24use sql::ast::Ident;
25use sql::statements::create::Partitions;
26use sql::statements::show::{
27 ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
28 ShowRegion, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
29};
30use sql::statements::OptionMap;
31use table::metadata::TableType;
32use table::table_name::TableName;
33use table::TableRef;
34
35use crate::error::{
36 self, CatalogSnafu, ExecuteStatementSnafu, ExternalSnafu, FindViewInfoSnafu, InvalidSqlSnafu,
37 Result, TableMetadataManagerSnafu, ViewInfoNotFoundSnafu, ViewNotFoundSnafu,
38};
39use crate::statement::StatementExecutor;
40
41impl StatementExecutor {
42 #[tracing::instrument(skip_all)]
43 pub(super) async fn show_databases(
44 &self,
45 stmt: ShowDatabases,
46 query_ctx: QueryContextRef,
47 ) -> Result<Output> {
48 query::sql::show_databases(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
49 .await
50 .context(ExecuteStatementSnafu)
51 }
52
53 #[tracing::instrument(skip_all)]
54 pub(super) async fn show_tables(
55 &self,
56 stmt: ShowTables,
57 query_ctx: QueryContextRef,
58 ) -> Result<Output> {
59 query::sql::show_tables(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
60 .await
61 .context(ExecuteStatementSnafu)
62 }
63
64 #[tracing::instrument(skip_all)]
65 pub(super) async fn show_table_status(
66 &self,
67 stmt: ShowTableStatus,
68 query_ctx: QueryContextRef,
69 ) -> Result<Output> {
70 query::sql::show_table_status(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
71 .await
72 .context(ExecuteStatementSnafu)
73 }
74
75 #[tracing::instrument(skip_all)]
76 pub(super) async fn show_columns(
77 &self,
78 stmt: ShowColumns,
79 query_ctx: QueryContextRef,
80 ) -> Result<Output> {
81 query::sql::show_columns(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
82 .await
83 .context(ExecuteStatementSnafu)
84 }
85
86 #[tracing::instrument(skip_all)]
87 pub(super) async fn show_index(
88 &self,
89 stmt: ShowIndex,
90 query_ctx: QueryContextRef,
91 ) -> Result<Output> {
92 query::sql::show_index(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
93 .await
94 .context(ExecuteStatementSnafu)
95 }
96
97 pub(super) async fn show_region(
98 &self,
99 stmt: ShowRegion,
100 query_ctx: QueryContextRef,
101 ) -> Result<Output> {
102 query::sql::show_region(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
103 .await
104 .context(ExecuteStatementSnafu)
105 }
106
107 #[tracing::instrument(skip_all)]
108 pub async fn show_create_database(
109 &self,
110 database_name: &str,
111 opts: OptionMap,
112 ) -> Result<Output> {
113 query::sql::show_create_database(database_name, opts).context(ExecuteStatementSnafu)
114 }
115
116 #[tracing::instrument(skip_all)]
117 pub async fn show_create_table(
118 &self,
119 table_name: TableName,
120 table: TableRef,
121 query_ctx: QueryContextRef,
122 ) -> Result<Output> {
123 let table_info = table.table_info();
124 if table_info.table_type != TableType::Base {
125 return error::ShowCreateTableBaseOnlySnafu {
126 table_name: table_name.to_string(),
127 table_type: table_info.table_type,
128 }
129 .fail();
130 }
131
132 let schema_options = self
133 .table_metadata_manager
134 .schema_manager()
135 .get(SchemaNameKey {
136 catalog: &table_name.catalog_name,
137 schema: &table_name.schema_name,
138 })
139 .await
140 .context(TableMetadataManagerSnafu)?
141 .map(|v| v.into_inner());
142
143 let partitions = self
144 .partition_manager
145 .find_table_partitions(table.table_info().table_id())
146 .await
147 .context(error::FindTablePartitionRuleSnafu {
148 table_name: &table_name.table_name,
149 })?;
150
151 let partitions = create_partitions_stmt(partitions)?;
152
153 query::sql::show_create_table(table, schema_options, partitions, query_ctx)
154 .context(ExecuteStatementSnafu)
155 }
156
157 #[tracing::instrument(skip_all)]
158 pub async fn show_create_table_for_pg(
159 &self,
160 table_name: TableName,
161 table: TableRef,
162 query_ctx: QueryContextRef,
163 ) -> Result<Output> {
164 let table_info = table.table_info();
165 if table_info.table_type != TableType::Base {
166 return error::ShowCreateTableBaseOnlySnafu {
167 table_name: table_name.to_string(),
168 table_type: table_info.table_type,
169 }
170 .fail();
171 }
172
173 query::sql::show_create_foreign_table_for_pg(table, query_ctx)
174 .context(ExecuteStatementSnafu)
175 }
176
177 #[tracing::instrument(skip_all)]
178 pub async fn show_create_view(
179 &self,
180 show: ShowCreateView,
181 query_ctx: QueryContextRef,
182 ) -> Result<Output> {
183 let (catalog, schema, view) = table_idents_to_full_name(&show.view_name, &query_ctx)
184 .map_err(BoxedError::new)
185 .context(ExternalSnafu)?;
186
187 let table_ref = self
188 .catalog_manager
189 .table(&catalog, &schema, &view, Some(&query_ctx))
190 .await
191 .context(CatalogSnafu)?
192 .context(ViewNotFoundSnafu { view_name: &view })?;
193
194 let view_id = table_ref.table_info().ident.table_id;
195
196 let view_info = self
197 .view_info_manager
198 .get(view_id)
199 .await
200 .context(FindViewInfoSnafu { view_name: &view })?
201 .context(ViewInfoNotFoundSnafu { view_name: &view })?;
202
203 query::sql::show_create_view(show.view_name, &view_info.definition, query_ctx)
204 .context(error::ExecuteStatementSnafu)
205 }
206
207 #[tracing::instrument(skip_all)]
208 pub(super) async fn show_views(
209 &self,
210 stmt: ShowViews,
211 query_ctx: QueryContextRef,
212 ) -> Result<Output> {
213 query::sql::show_views(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
214 .await
215 .context(ExecuteStatementSnafu)
216 }
217
218 #[tracing::instrument(skip_all)]
219 pub(super) async fn show_flows(
220 &self,
221 stmt: ShowFlows,
222 query_ctx: QueryContextRef,
223 ) -> Result<Output> {
224 query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
225 .await
226 .context(ExecuteStatementSnafu)
227 }
228
229 #[cfg(feature = "enterprise")]
230 #[tracing::instrument(skip_all)]
231 pub(super) async fn show_triggers(
232 &self,
233 _stmt: sql::statements::show::trigger::ShowTriggers,
234 _query_ctx: QueryContextRef,
235 ) -> Result<Output> {
236 crate::error::UnsupportedTriggerSnafu.fail()
237 }
238
239 #[tracing::instrument(skip_all)]
240 pub async fn show_create_flow(
241 &self,
242 show: ShowCreateFlow,
243 query_ctx: QueryContextRef,
244 ) -> Result<Output> {
245 let obj_name = &show.flow_name;
246 let (catalog_name, flow_name) = match &obj_name.0[..] {
247 [table] => (query_ctx.current_catalog().to_string(), table.value.clone()),
248 [catalog, table] => (catalog.value.clone(), table.value.clone()),
249 _ => {
250 return InvalidSqlSnafu {
251 err_msg: format!(
252 "expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {obj_name}",
253 ),
254 }
255 .fail()
256 }
257 };
258
259 let flow_name_val = self
260 .flow_metadata_manager
261 .flow_name_manager()
262 .get(&catalog_name, &flow_name)
263 .await
264 .context(error::TableMetadataManagerSnafu)?
265 .context(error::FlowNotFoundSnafu {
266 flow_name: &flow_name,
267 })?;
268
269 let flow_val = self
270 .flow_metadata_manager
271 .flow_info_manager()
272 .get(flow_name_val.flow_id())
273 .await
274 .context(error::TableMetadataManagerSnafu)?
275 .context(error::FlowNotFoundSnafu {
276 flow_name: &flow_name,
277 })?;
278
279 query::sql::show_create_flow(obj_name.clone(), flow_val, query_ctx)
280 .context(error::ExecuteStatementSnafu)
281 }
282
283 #[tracing::instrument(skip_all)]
284 pub fn show_variable(&self, stmt: ShowVariables, query_ctx: QueryContextRef) -> Result<Output> {
285 query::sql::show_variable(stmt, query_ctx).context(error::ExecuteStatementSnafu)
286 }
287
288 #[tracing::instrument(skip_all)]
289 pub async fn show_collation(
290 &self,
291 kind: ShowKind,
292 query_ctx: QueryContextRef,
293 ) -> Result<Output> {
294 query::sql::show_collations(kind, &self.query_engine, &self.catalog_manager, query_ctx)
295 .await
296 .context(error::ExecuteStatementSnafu)
297 }
298
299 #[tracing::instrument(skip_all)]
300 pub async fn show_charset(&self, kind: ShowKind, query_ctx: QueryContextRef) -> Result<Output> {
301 query::sql::show_charsets(kind, &self.query_engine, &self.catalog_manager, query_ctx)
302 .await
303 .context(error::ExecuteStatementSnafu)
304 }
305
306 #[tracing::instrument(skip_all)]
307 pub async fn show_status(&self, query_ctx: QueryContextRef) -> Result<Output> {
308 query::sql::show_status(query_ctx)
309 .await
310 .context(error::ExecuteStatementSnafu)
311 }
312 pub async fn show_search_path(&self, query_ctx: QueryContextRef) -> Result<Output> {
313 query::sql::show_search_path(query_ctx)
314 .await
315 .context(error::ExecuteStatementSnafu)
316 }
317}
318
319pub(crate) fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Partitions>> {
320 if partitions.is_empty() {
321 return Ok(None);
322 }
323
324 let column_list: Vec<Ident> = partitions[0]
325 .partition
326 .partition_columns()
327 .iter()
328 .map(|name| name[..].into())
329 .collect();
330
331 let exprs = partitions
332 .iter()
333 .filter_map(|partition| {
334 partition
335 .partition
336 .partition_bounds()
337 .first()
338 .and_then(|bound| {
339 if let PartitionBound::Expr(expr) = bound {
340 Some(expr.to_parser_expr())
341 } else {
342 None
343 }
344 })
345 })
346 .collect();
347
348 Ok(Some(Partitions { column_list, exprs }))
349}