operator/statement/
admin.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_function::function::FunctionContext;
18use common_function::function_registry::FUNCTION_REGISTRY;
19use common_query::prelude::TypeSignature;
20use common_query::Output;
21use common_recordbatch::{RecordBatch, RecordBatches};
22use common_telemetry::tracing;
23use common_time::Timezone;
24use datatypes::data_type::DataType;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::schema::{ColumnSchema, Schema};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use session::context::QueryContextRef;
30use snafu::{ensure, OptionExt, ResultExt};
31use sql::ast::{Expr, FunctionArg, FunctionArgExpr, FunctionArguments, Value as SqlValue};
32use sql::statements::admin::Admin;
33use sql::statements::sql_value_to_value;
34
35use crate::error::{self, Result};
36use crate::statement::StatementExecutor;
37
38const DUMMY_COLUMN: &str = "<dummy>";
39
40impl StatementExecutor {
41    /// Execute the [`Admin`] statement and returns the output.
42    #[tracing::instrument(skip_all)]
43    pub(super) async fn execute_admin_command(
44        &self,
45        stmt: Admin,
46        query_ctx: QueryContextRef,
47    ) -> Result<Output> {
48        let Admin::Func(func) = &stmt;
49        // the function name should be in lower case.
50        let func_name = func.name.to_string().to_lowercase();
51        let admin_func = FUNCTION_REGISTRY
52            .get_async_function(&func_name)
53            .context(error::AdminFunctionNotFoundSnafu { name: func_name })?;
54
55        let signature = admin_func.signature();
56        let FunctionArguments::List(args) = &func.args else {
57            return error::BuildAdminFunctionArgsSnafu {
58                msg: format!("unsupported function args {}", func.args),
59            }
60            .fail();
61        };
62        let arg_values = args
63            .args
64            .iter()
65            .map(|arg| {
66                let FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(value))) = arg else {
67                    return error::BuildAdminFunctionArgsSnafu {
68                        msg: format!("unsupported function arg {arg}"),
69                    }
70                    .fail();
71                };
72                Ok(value)
73            })
74            .collect::<Result<Vec<_>>>()?;
75
76        let args = args_to_vector(&signature.type_signature, &arg_values, &query_ctx)?;
77        let arg_types = args.iter().map(|arg| arg.data_type()).collect::<Vec<_>>();
78
79        let func_ctx = FunctionContext {
80            query_ctx,
81            state: self.query_engine.engine_state().function_state(),
82        };
83
84        let result = admin_func
85            .eval(func_ctx, &args)
86            .await
87            .context(error::ExecuteAdminFunctionSnafu)?;
88
89        let column_schemas = vec![ColumnSchema::new(
90            // Use statement as the result column name
91            stmt.to_string(),
92            admin_func
93                .return_type(&arg_types)
94                .context(error::ExecuteAdminFunctionSnafu)?,
95            false,
96        )];
97        let schema = Arc::new(Schema::new(column_schemas));
98        let batch =
99            RecordBatch::new(schema.clone(), vec![result]).context(error::BuildRecordBatchSnafu)?;
100        let batches =
101            RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
102
103        Ok(Output::new_with_record_batches(batches))
104    }
105}
106
107/// Try to cast the arguments to vectors by function's signature.
108fn args_to_vector(
109    type_signature: &TypeSignature,
110    args: &Vec<&SqlValue>,
111    query_ctx: &QueryContextRef,
112) -> Result<Vec<VectorRef>> {
113    let tz = query_ctx.timezone();
114
115    match type_signature {
116        TypeSignature::Variadic(valid_types) => {
117            values_to_vectors_by_valid_types(valid_types, args, Some(&tz))
118        }
119
120        TypeSignature::Uniform(arity, valid_types) => {
121            ensure!(
122                *arity == args.len(),
123                error::FunctionArityMismatchSnafu {
124                    actual: args.len(),
125                    expected: *arity,
126                }
127            );
128
129            values_to_vectors_by_valid_types(valid_types, args, Some(&tz))
130        }
131
132        TypeSignature::Exact(data_types) => {
133            values_to_vectors_by_exact_types(data_types, args, Some(&tz))
134        }
135
136        TypeSignature::VariadicAny => {
137            let data_types = args
138                .iter()
139                .map(|value| try_get_data_type_for_sql_value(value))
140                .collect::<Result<Vec<_>>>()?;
141
142            values_to_vectors_by_exact_types(&data_types, args, Some(&tz))
143        }
144
145        TypeSignature::Any(arity) => {
146            ensure!(
147                *arity == args.len(),
148                error::FunctionArityMismatchSnafu {
149                    actual: args.len(),
150                    expected: *arity,
151                }
152            );
153
154            let data_types = args
155                .iter()
156                .map(|value| try_get_data_type_for_sql_value(value))
157                .collect::<Result<Vec<_>>>()?;
158
159            values_to_vectors_by_exact_types(&data_types, args, Some(&tz))
160        }
161
162        TypeSignature::OneOf(type_sigs) => {
163            for type_sig in type_sigs {
164                if let Ok(vectors) = args_to_vector(type_sig, args, query_ctx) {
165                    return Ok(vectors);
166                }
167            }
168
169            error::BuildAdminFunctionArgsSnafu {
170                msg: "function signature not match",
171            }
172            .fail()
173        }
174
175        TypeSignature::NullAry => Ok(vec![]),
176    }
177}
178
179/// Try to cast sql values to vectors by exact data types.
180fn values_to_vectors_by_exact_types(
181    exact_types: &[ConcreteDataType],
182    args: &[&SqlValue],
183    tz: Option<&Timezone>,
184) -> Result<Vec<VectorRef>> {
185    args.iter()
186        .zip(exact_types.iter())
187        .map(|(value, data_type)| {
188            let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
189                .context(error::ParseSqlValueSnafu)?;
190
191            Ok(value_to_vector(value))
192        })
193        .collect()
194}
195
196/// Try to cast sql values to vectors by valid data types.
197fn values_to_vectors_by_valid_types(
198    valid_types: &[ConcreteDataType],
199    args: &[&SqlValue],
200    tz: Option<&Timezone>,
201) -> Result<Vec<VectorRef>> {
202    args.iter()
203        .map(|value| {
204            for data_type in valid_types {
205                if let Ok(value) =
206                    sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
207                {
208                    return Ok(value_to_vector(value));
209                }
210            }
211
212            error::BuildAdminFunctionArgsSnafu {
213                msg: format!("failed to cast {value}"),
214            }
215            .fail()
216        })
217        .collect::<Result<Vec<_>>>()
218}
219
220/// Build a [`VectorRef`] from [`Value`]
221fn value_to_vector(value: Value) -> VectorRef {
222    let data_type = value.data_type();
223    let mut mutable_vector = data_type.create_mutable_vector(1);
224    mutable_vector.push_value_ref(value.as_value_ref());
225
226    mutable_vector.to_vector()
227}
228
229/// Try to infer the data type from sql value.
230fn try_get_data_type_for_sql_value(value: &SqlValue) -> Result<ConcreteDataType> {
231    match value {
232        SqlValue::Number(_, _) => Ok(ConcreteDataType::float64_datatype()),
233        SqlValue::Null => Ok(ConcreteDataType::null_datatype()),
234        SqlValue::Boolean(_) => Ok(ConcreteDataType::boolean_datatype()),
235        SqlValue::HexStringLiteral(_)
236        | SqlValue::DoubleQuotedString(_)
237        | SqlValue::SingleQuotedString(_) => Ok(ConcreteDataType::string_datatype()),
238        _ => error::BuildAdminFunctionArgsSnafu {
239            msg: format!("unsupported sql value: {value}"),
240        }
241        .fail(),
242    }
243}