common_function/
function.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::fmt;
17use std::fmt::{Debug, Formatter};
18use std::sync::Arc;
19
20use datafusion::arrow::datatypes::DataType;
21use datafusion::logical_expr::ColumnarValue;
22use datafusion_common::DataFusionError;
23use datafusion_common::arrow::array::ArrayRef;
24use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions};
25use datafusion_expr::{ScalarFunctionArgs, Signature};
26use session::context::{QueryContextBuilder, QueryContextRef};
27
28use crate::state::FunctionState;
29
30/// The function execution context
31#[derive(Clone)]
32pub struct FunctionContext {
33    pub query_ctx: QueryContextRef,
34    pub state: Arc<FunctionState>,
35}
36
37impl FunctionContext {
38    /// Create a mock [`FunctionContext`] for test.
39    #[cfg(any(test, feature = "testing"))]
40    pub fn mock() -> Self {
41        Self {
42            query_ctx: QueryContextBuilder::default().build().into(),
43            state: Arc::new(FunctionState::mock()),
44        }
45    }
46}
47
48impl std::fmt::Display for FunctionContext {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        write!(f, "FunctionContext {{ query_ctx: {} }}", self.query_ctx)
51    }
52}
53
54impl Default for FunctionContext {
55    fn default() -> Self {
56        Self {
57            query_ctx: QueryContextBuilder::default().build().into(),
58            state: Arc::new(FunctionState::default()),
59        }
60    }
61}
62
63impl ExtensionOptions for FunctionContext {
64    fn as_any(&self) -> &dyn Any {
65        self
66    }
67
68    fn as_any_mut(&mut self) -> &mut dyn Any {
69        self
70    }
71
72    fn cloned(&self) -> Box<dyn ExtensionOptions> {
73        Box::new(self.clone())
74    }
75
76    fn set(&mut self, _: &str, _: &str) -> datafusion_common::Result<()> {
77        Err(DataFusionError::NotImplemented(
78            "set options for `FunctionContext`".to_string(),
79        ))
80    }
81
82    fn entries(&self) -> Vec<ConfigEntry> {
83        vec![]
84    }
85}
86
87impl Debug for FunctionContext {
88    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
89        f.debug_struct("FunctionContext")
90            .field("query_ctx", &self.query_ctx)
91            .finish()
92    }
93}
94
95impl ConfigExtension for FunctionContext {
96    const PREFIX: &'static str = "FunctionContext";
97}
98
99/// Scalar function trait, modified from databend to adapt datafusion
100/// TODO(dennis): optimize function by it's features such as monotonicity etc.
101pub trait Function: fmt::Display + Sync + Send {
102    /// Returns the name of the function, should be unique.
103    fn name(&self) -> &str;
104
105    /// The returned data type of function execution.
106    fn return_type(&self, input_types: &[DataType]) -> datafusion_common::Result<DataType>;
107
108    /// The signature of function.
109    fn signature(&self) -> &Signature;
110
111    fn invoke_with_args(
112        &self,
113        args: ScalarFunctionArgs,
114    ) -> datafusion_common::Result<ColumnarValue>;
115
116    fn aliases(&self) -> &[String] {
117        &[]
118    }
119}
120
121pub type FunctionRef = Arc<dyn Function>;
122
123/// Find the [FunctionContext] in the [ScalarFunctionArgs]. The [FunctionContext] was set
124/// previously in the DataFusion session context creation, and is passed all the way down to the
125/// args by DataFusion.
126pub(crate) fn find_function_context(
127    args: &ScalarFunctionArgs,
128) -> datafusion_common::Result<&FunctionContext> {
129    let Some(x) = args.config_options.extensions.get::<FunctionContext>() else {
130        return Err(DataFusionError::Execution(
131            "function context is not set".to_string(),
132        ));
133    };
134    Ok(x)
135}
136
137/// Extract UDF arguments (as Arrow's [ArrayRef]) from [ScalarFunctionArgs] directly.
138pub(crate) fn extract_args<const N: usize>(
139    name: &str,
140    args: &ScalarFunctionArgs,
141) -> datafusion_common::Result<[ArrayRef; N]> {
142    ColumnarValue::values_to_arrays(&args.args)
143        .and_then(|x| datafusion_common::utils::take_function_args(name, x))
144}