common_function/system/
database.rs1use datafusion::arrow::datatypes::DataType;
16use datafusion_common::ScalarValue;
17use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
18
19use crate::function::{Function, find_function_context};
20use crate::system::define_nullary_udf;
21
22define_nullary_udf!(DatabaseFunction);
23define_nullary_udf!(ReadPreferenceFunction);
24define_nullary_udf!(PgBackendPidFunction);
25define_nullary_udf!(ConnectionIdFunction);
26
27const DATABASE_FUNCTION_NAME: &str = "database";
28const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
29const PG_BACKEND_PID: &str = "pg_backend_pid";
30const CONNECTION_ID: &str = "connection_id";
31
32impl Function for DatabaseFunction {
33 fn name(&self) -> &str {
34 DATABASE_FUNCTION_NAME
35 }
36
37 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
38 Ok(DataType::Utf8View)
39 }
40
41 fn signature(&self) -> &Signature {
42 &self.signature
43 }
44
45 fn invoke_with_args(
46 &self,
47 args: ScalarFunctionArgs,
48 ) -> datafusion_common::Result<ColumnarValue> {
49 let func_ctx = find_function_context(&args)?;
50 let db = func_ctx.query_ctx.current_schema();
51
52 Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
53 }
54}
55
56impl Function for ReadPreferenceFunction {
57 fn name(&self) -> &str {
58 READ_PREFERENCE_FUNCTION_NAME
59 }
60
61 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
62 Ok(DataType::Utf8View)
63 }
64
65 fn signature(&self) -> &Signature {
66 &self.signature
67 }
68
69 fn invoke_with_args(
70 &self,
71 args: ScalarFunctionArgs,
72 ) -> datafusion_common::Result<ColumnarValue> {
73 let func_ctx = find_function_context(&args)?;
74 let read_preference = func_ctx.query_ctx.read_preference();
75
76 Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
77 read_preference.to_string(),
78 ))))
79 }
80}
81
82impl Function for PgBackendPidFunction {
83 fn name(&self) -> &str {
84 PG_BACKEND_PID
85 }
86
87 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
88 Ok(DataType::UInt64)
89 }
90
91 fn signature(&self) -> &Signature {
92 &self.signature
93 }
94
95 fn invoke_with_args(
96 &self,
97 args: ScalarFunctionArgs,
98 ) -> datafusion_common::Result<ColumnarValue> {
99 let func_ctx = find_function_context(&args)?;
100 let pid = func_ctx.query_ctx.process_id();
101
102 Ok(ColumnarValue::Scalar(ScalarValue::UInt64(Some(pid as u64))))
103 }
104}
105
106impl Function for ConnectionIdFunction {
107 fn name(&self) -> &str {
108 CONNECTION_ID
109 }
110
111 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
112 Ok(DataType::UInt32)
113 }
114
115 fn signature(&self) -> &Signature {
116 &self.signature
117 }
118
119 fn invoke_with_args(
120 &self,
121 args: ScalarFunctionArgs,
122 ) -> datafusion_common::Result<ColumnarValue> {
123 let func_ctx = find_function_context(&args)?;
124 let pid = func_ctx.query_ctx.process_id();
125
126 Ok(ColumnarValue::Scalar(ScalarValue::UInt32(Some(pid))))
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use std::sync::Arc;
133
134 use arrow_schema::Field;
135 use datafusion_common::config::ConfigOptions;
136 use session::context::QueryContextBuilder;
137
138 use super::*;
139 use crate::function::FunctionContext;
140 #[test]
141 fn test_build_function() {
142 let build = DatabaseFunction::default();
143 assert_eq!("database", build.name());
144 assert_eq!(DataType::Utf8View, build.return_type(&[]).unwrap());
145
146 let query_ctx = QueryContextBuilder::default()
147 .current_schema("test_db".to_string())
148 .build()
149 .into();
150
151 let mut config_options = ConfigOptions::default();
152 config_options.extensions.insert(FunctionContext {
153 query_ctx,
154 ..Default::default()
155 });
156 let config_options = Arc::new(config_options);
157
158 let args = ScalarFunctionArgs {
159 args: vec![],
160 arg_fields: vec![],
161 number_rows: 0,
162 return_field: Arc::new(Field::new("x", DataType::UInt64, false)),
163 config_options,
164 };
165 let result = build.invoke_with_args(args).unwrap();
166 let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(s))) = result else {
167 unreachable!()
168 };
169 assert_eq!(s, "test_db");
170 }
171}