common_function/
function.rs1use std::any::Any;
16use std::fmt;
17use std::fmt::{Debug, Formatter};
18use std::sync::Arc;
19
20use datafusion::arrow::datatypes::{DataType, Field};
21use datafusion::logical_expr::{ColumnarValue, ReturnFieldArgs};
22use datafusion_common::DataFusionError;
23use datafusion_common::arrow::array::ArrayRef;
24use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions};
25use datafusion_expr::{ScalarFunctionArgs, Signature};
26use session::context::{QueryContextBuilder, QueryContextRef};
27
28use crate::state::FunctionState;
29
30#[derive(Clone)]
32pub struct FunctionContext {
33 pub query_ctx: QueryContextRef,
34 pub state: Arc<FunctionState>,
35}
36
37impl FunctionContext {
38 #[cfg(any(test, feature = "testing"))]
40 pub fn mock() -> Self {
41 Self {
42 query_ctx: QueryContextBuilder::default().build().into(),
43 state: Arc::new(FunctionState::mock()),
44 }
45 }
46}
47
48impl std::fmt::Display for FunctionContext {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 write!(f, "FunctionContext {{ query_ctx: {} }}", self.query_ctx)
51 }
52}
53
54impl Default for FunctionContext {
55 fn default() -> Self {
56 Self {
57 query_ctx: QueryContextBuilder::default().build().into(),
58 state: Arc::new(FunctionState::default()),
59 }
60 }
61}
62
63impl ExtensionOptions for FunctionContext {
64 fn as_any(&self) -> &dyn Any {
65 self
66 }
67
68 fn as_any_mut(&mut self) -> &mut dyn Any {
69 self
70 }
71
72 fn cloned(&self) -> Box<dyn ExtensionOptions> {
73 Box::new(self.clone())
74 }
75
76 fn set(&mut self, _: &str, _: &str) -> datafusion_common::Result<()> {
77 Err(DataFusionError::NotImplemented(
78 "set options for `FunctionContext`".to_string(),
79 ))
80 }
81
82 fn entries(&self) -> Vec<ConfigEntry> {
83 vec![]
84 }
85}
86
87impl Debug for FunctionContext {
88 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
89 f.debug_struct("FunctionContext")
90 .field("query_ctx", &self.query_ctx)
91 .finish()
92 }
93}
94
95impl ConfigExtension for FunctionContext {
96 const PREFIX: &'static str = "FunctionContext";
97}
98
99pub trait Function: fmt::Display + Sync + Send {
102 fn name(&self) -> &str;
104
105 fn return_type(&self, input_types: &[DataType]) -> datafusion_common::Result<DataType>;
107
108 fn signature(&self) -> &Signature;
110
111 fn invoke_with_args(
112 &self,
113 args: ScalarFunctionArgs,
114 ) -> datafusion_common::Result<ColumnarValue>;
115
116 fn aliases(&self) -> &[String] {
117 &[]
118 }
119
120 fn return_field_from_args(
125 &self,
126 args: ReturnFieldArgs<'_>,
127 ) -> datafusion_common::Result<Arc<Field>> {
128 let input_types = args
129 .arg_fields
130 .iter()
131 .map(|f| f.data_type().clone())
132 .collect::<Vec<_>>();
133 let return_type = self.return_type(&input_types)?;
134 Ok(Arc::new(Field::new(self.name(), return_type, true)))
135 }
136}
137
138pub type FunctionRef = Arc<dyn Function>;
139
140pub(crate) fn find_function_context(
144 args: &ScalarFunctionArgs,
145) -> datafusion_common::Result<&FunctionContext> {
146 let Some(x) = args.config_options.extensions.get::<FunctionContext>() else {
147 return Err(DataFusionError::Execution(
148 "function context is not set".to_string(),
149 ));
150 };
151 Ok(x)
152}
153
154pub(crate) fn extract_args<const N: usize>(
156 name: &str,
157 args: &ScalarFunctionArgs,
158) -> datafusion_common::Result<[ArrayRef; N]> {
159 ColumnarValue::values_to_arrays(&args.args)
160 .and_then(|x| datafusion_common::utils::take_function_args(name, x))
161}