operator/statement/
admin.rs1use std::sync::Arc;
16
17use common_function::function::FunctionContext;
18use common_function::function_registry::FUNCTION_REGISTRY;
19use common_query::prelude::TypeSignature;
20use common_query::Output;
21use common_recordbatch::{RecordBatch, RecordBatches};
22use common_telemetry::tracing;
23use common_time::Timezone;
24use datatypes::data_type::DataType;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::schema::{ColumnSchema, Schema};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use session::context::QueryContextRef;
30use snafu::{ensure, OptionExt, ResultExt};
31use sql::ast::{Expr, FunctionArg, FunctionArgExpr, FunctionArguments, Value as SqlValue};
32use sql::statements::admin::Admin;
33use sql::statements::sql_value_to_value;
34
35use crate::error::{self, Result};
36use crate::statement::StatementExecutor;
37
38const DUMMY_COLUMN: &str = "<dummy>";
39
40impl StatementExecutor {
41 #[tracing::instrument(skip_all)]
43 pub(super) async fn execute_admin_command(
44 &self,
45 stmt: Admin,
46 query_ctx: QueryContextRef,
47 ) -> Result<Output> {
48 let Admin::Func(func) = &stmt;
49 let func_name = func.name.to_string().to_lowercase();
51 let admin_func = FUNCTION_REGISTRY
52 .get_async_function(&func_name)
53 .context(error::AdminFunctionNotFoundSnafu { name: func_name })?;
54
55 let signature = admin_func.signature();
56 let FunctionArguments::List(args) = &func.args else {
57 return error::BuildAdminFunctionArgsSnafu {
58 msg: format!("unsupported function args {}", func.args),
59 }
60 .fail();
61 };
62 let arg_values = args
63 .args
64 .iter()
65 .map(|arg| {
66 let FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(value))) = arg else {
67 return error::BuildAdminFunctionArgsSnafu {
68 msg: format!("unsupported function arg {arg}"),
69 }
70 .fail();
71 };
72 Ok(value)
73 })
74 .collect::<Result<Vec<_>>>()?;
75
76 let args = args_to_vector(&signature.type_signature, &arg_values, &query_ctx)?;
77 let arg_types = args.iter().map(|arg| arg.data_type()).collect::<Vec<_>>();
78
79 let func_ctx = FunctionContext {
80 query_ctx,
81 state: self.query_engine.engine_state().function_state(),
82 };
83
84 let result = admin_func
85 .eval(func_ctx, &args)
86 .await
87 .context(error::ExecuteAdminFunctionSnafu)?;
88
89 let column_schemas = vec![ColumnSchema::new(
90 stmt.to_string(),
92 admin_func
93 .return_type(&arg_types)
94 .context(error::ExecuteAdminFunctionSnafu)?,
95 false,
96 )];
97 let schema = Arc::new(Schema::new(column_schemas));
98 let batch =
99 RecordBatch::new(schema.clone(), vec![result]).context(error::BuildRecordBatchSnafu)?;
100 let batches =
101 RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
102
103 Ok(Output::new_with_record_batches(batches))
104 }
105}
106
107fn args_to_vector(
109 type_signature: &TypeSignature,
110 args: &Vec<&SqlValue>,
111 query_ctx: &QueryContextRef,
112) -> Result<Vec<VectorRef>> {
113 let tz = query_ctx.timezone();
114
115 match type_signature {
116 TypeSignature::Variadic(valid_types) => {
117 values_to_vectors_by_valid_types(valid_types, args, Some(&tz))
118 }
119
120 TypeSignature::Uniform(arity, valid_types) => {
121 ensure!(
122 *arity == args.len(),
123 error::FunctionArityMismatchSnafu {
124 actual: args.len(),
125 expected: *arity,
126 }
127 );
128
129 values_to_vectors_by_valid_types(valid_types, args, Some(&tz))
130 }
131
132 TypeSignature::Exact(data_types) => {
133 values_to_vectors_by_exact_types(data_types, args, Some(&tz))
134 }
135
136 TypeSignature::VariadicAny => {
137 let data_types = args
138 .iter()
139 .map(|value| try_get_data_type_for_sql_value(value))
140 .collect::<Result<Vec<_>>>()?;
141
142 values_to_vectors_by_exact_types(&data_types, args, Some(&tz))
143 }
144
145 TypeSignature::Any(arity) => {
146 ensure!(
147 *arity == args.len(),
148 error::FunctionArityMismatchSnafu {
149 actual: args.len(),
150 expected: *arity,
151 }
152 );
153
154 let data_types = args
155 .iter()
156 .map(|value| try_get_data_type_for_sql_value(value))
157 .collect::<Result<Vec<_>>>()?;
158
159 values_to_vectors_by_exact_types(&data_types, args, Some(&tz))
160 }
161
162 TypeSignature::OneOf(type_sigs) => {
163 for type_sig in type_sigs {
164 if let Ok(vectors) = args_to_vector(type_sig, args, query_ctx) {
165 return Ok(vectors);
166 }
167 }
168
169 error::BuildAdminFunctionArgsSnafu {
170 msg: "function signature not match",
171 }
172 .fail()
173 }
174
175 TypeSignature::NullAry => Ok(vec![]),
176 }
177}
178
179fn values_to_vectors_by_exact_types(
181 exact_types: &[ConcreteDataType],
182 args: &[&SqlValue],
183 tz: Option<&Timezone>,
184) -> Result<Vec<VectorRef>> {
185 args.iter()
186 .zip(exact_types.iter())
187 .map(|(value, data_type)| {
188 let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
189 .context(error::ParseSqlValueSnafu)?;
190
191 Ok(value_to_vector(value))
192 })
193 .collect()
194}
195
196fn values_to_vectors_by_valid_types(
198 valid_types: &[ConcreteDataType],
199 args: &[&SqlValue],
200 tz: Option<&Timezone>,
201) -> Result<Vec<VectorRef>> {
202 args.iter()
203 .map(|value| {
204 for data_type in valid_types {
205 if let Ok(value) =
206 sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
207 {
208 return Ok(value_to_vector(value));
209 }
210 }
211
212 error::BuildAdminFunctionArgsSnafu {
213 msg: format!("failed to cast {value}"),
214 }
215 .fail()
216 })
217 .collect::<Result<Vec<_>>>()
218}
219
220fn value_to_vector(value: Value) -> VectorRef {
222 let data_type = value.data_type();
223 let mut mutable_vector = data_type.create_mutable_vector(1);
224 mutable_vector.push_value_ref(value.as_value_ref());
225
226 mutable_vector.to_vector()
227}
228
229fn try_get_data_type_for_sql_value(value: &SqlValue) -> Result<ConcreteDataType> {
231 match value {
232 SqlValue::Number(_, _) => Ok(ConcreteDataType::float64_datatype()),
233 SqlValue::Null => Ok(ConcreteDataType::null_datatype()),
234 SqlValue::Boolean(_) => Ok(ConcreteDataType::boolean_datatype()),
235 SqlValue::HexStringLiteral(_)
236 | SqlValue::DoubleQuotedString(_)
237 | SqlValue::SingleQuotedString(_) => Ok(ConcreteDataType::string_datatype()),
238 _ => error::BuildAdminFunctionArgsSnafu {
239 msg: format!("unsupported sql value: {value}"),
240 }
241 .fail(),
242 }
243}