common_function/system/
database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use datafusion::arrow::datatypes::DataType;
16use datafusion_common::ScalarValue;
17use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
18
19use crate::function::{Function, find_function_context};
20use crate::system::define_nullary_udf;
21
22define_nullary_udf!(DatabaseFunction);
23define_nullary_udf!(ReadPreferenceFunction);
24define_nullary_udf!(PgBackendPidFunction);
25define_nullary_udf!(ConnectionIdFunction);
26
27const DATABASE_FUNCTION_NAME: &str = "database";
28const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
29const PG_BACKEND_PID: &str = "pg_backend_pid";
30const CONNECTION_ID: &str = "connection_id";
31
32impl Function for DatabaseFunction {
33    fn name(&self) -> &str {
34        DATABASE_FUNCTION_NAME
35    }
36
37    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
38        Ok(DataType::Utf8View)
39    }
40
41    fn signature(&self) -> &Signature {
42        &self.signature
43    }
44
45    fn invoke_with_args(
46        &self,
47        args: ScalarFunctionArgs,
48    ) -> datafusion_common::Result<ColumnarValue> {
49        let func_ctx = find_function_context(&args)?;
50        let db = func_ctx.query_ctx.current_schema();
51
52        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
53    }
54}
55
56impl Function for ReadPreferenceFunction {
57    fn name(&self) -> &str {
58        READ_PREFERENCE_FUNCTION_NAME
59    }
60
61    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
62        Ok(DataType::Utf8View)
63    }
64
65    fn signature(&self) -> &Signature {
66        &self.signature
67    }
68
69    fn invoke_with_args(
70        &self,
71        args: ScalarFunctionArgs,
72    ) -> datafusion_common::Result<ColumnarValue> {
73        let func_ctx = find_function_context(&args)?;
74        let read_preference = func_ctx.query_ctx.read_preference();
75
76        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
77            read_preference.to_string(),
78        ))))
79    }
80}
81
82impl Function for PgBackendPidFunction {
83    fn name(&self) -> &str {
84        PG_BACKEND_PID
85    }
86
87    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
88        Ok(DataType::UInt64)
89    }
90
91    fn signature(&self) -> &Signature {
92        &self.signature
93    }
94
95    fn invoke_with_args(
96        &self,
97        args: ScalarFunctionArgs,
98    ) -> datafusion_common::Result<ColumnarValue> {
99        let func_ctx = find_function_context(&args)?;
100        let pid = func_ctx.query_ctx.process_id();
101
102        Ok(ColumnarValue::Scalar(ScalarValue::UInt64(Some(pid as u64))))
103    }
104}
105
106impl Function for ConnectionIdFunction {
107    fn name(&self) -> &str {
108        CONNECTION_ID
109    }
110
111    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
112        Ok(DataType::UInt32)
113    }
114
115    fn signature(&self) -> &Signature {
116        &self.signature
117    }
118
119    fn invoke_with_args(
120        &self,
121        args: ScalarFunctionArgs,
122    ) -> datafusion_common::Result<ColumnarValue> {
123        let func_ctx = find_function_context(&args)?;
124        let pid = func_ctx.query_ctx.process_id();
125
126        Ok(ColumnarValue::Scalar(ScalarValue::UInt32(Some(pid))))
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use std::sync::Arc;
133
134    use arrow_schema::Field;
135    use datafusion_common::config::ConfigOptions;
136    use session::context::QueryContextBuilder;
137
138    use super::*;
139    use crate::function::FunctionContext;
140    #[test]
141    fn test_build_function() {
142        let build = DatabaseFunction::default();
143        assert_eq!("database", build.name());
144        assert_eq!(DataType::Utf8View, build.return_type(&[]).unwrap());
145
146        let query_ctx = QueryContextBuilder::default()
147            .current_schema("test_db".to_string())
148            .build()
149            .into();
150
151        let mut config_options = ConfigOptions::default();
152        config_options.extensions.insert(FunctionContext {
153            query_ctx,
154            ..Default::default()
155        });
156        let config_options = Arc::new(config_options);
157
158        let args = ScalarFunctionArgs {
159            args: vec![],
160            arg_fields: vec![],
161            number_rows: 0,
162            return_field: Arc::new(Field::new("x", DataType::UInt64, false)),
163            config_options,
164        };
165        let result = build.invoke_with_args(args).unwrap();
166        let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(s))) = result else {
167            unreachable!()
168        };
169        assert_eq!(s, "test_db");
170    }
171}