common_function/system/
database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self};
16use std::sync::Arc;
17
18use common_query::error::Result;
19use datafusion::arrow::datatypes::DataType;
20use datafusion_expr::{Signature, Volatility};
21use datatypes::prelude::ScalarVector;
22use datatypes::vectors::{StringVector, UInt32Vector, VectorRef};
23use derive_more::Display;
24
25use crate::function::{Function, FunctionContext};
26
27/// A function to return current schema name.
28#[derive(Clone, Debug, Default)]
29pub struct DatabaseFunction;
30
31#[derive(Clone, Debug, Default)]
32pub struct CurrentSchemaFunction;
33pub struct SessionUserFunction;
34
35pub struct ReadPreferenceFunction;
36
37#[derive(Display)]
38#[display("{}", self.name())]
39pub struct PgBackendPidFunction;
40
41#[derive(Display)]
42#[display("{}", self.name())]
43pub struct ConnectionIdFunction;
44
45const DATABASE_FUNCTION_NAME: &str = "database";
46const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
47const SESSION_USER_FUNCTION_NAME: &str = "session_user";
48const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
49const PG_BACKEND_PID: &str = "pg_backend_pid";
50const CONNECTION_ID: &str = "connection_id";
51
52impl Function for DatabaseFunction {
53    fn name(&self) -> &str {
54        DATABASE_FUNCTION_NAME
55    }
56
57    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
58        Ok(DataType::Utf8)
59    }
60
61    fn signature(&self) -> Signature {
62        Signature::nullary(Volatility::Immutable)
63    }
64
65    fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
66        let db = func_ctx.query_ctx.current_schema();
67
68        Ok(Arc::new(StringVector::from_slice(&[&db])) as _)
69    }
70}
71
72// Though "current_schema" can be aliased to "database", to not cause any breaking changes,
73// we are not doing it: not until https://github.com/apache/datafusion/issues/17469 is resolved.
74impl Function for CurrentSchemaFunction {
75    fn name(&self) -> &str {
76        CURRENT_SCHEMA_FUNCTION_NAME
77    }
78
79    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
80        Ok(DataType::Utf8)
81    }
82
83    fn signature(&self) -> Signature {
84        Signature::nullary(Volatility::Immutable)
85    }
86
87    fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
88        let db = func_ctx.query_ctx.current_schema();
89
90        Ok(Arc::new(StringVector::from_slice(&[&db])) as _)
91    }
92}
93
94impl Function for SessionUserFunction {
95    fn name(&self) -> &str {
96        SESSION_USER_FUNCTION_NAME
97    }
98
99    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
100        Ok(DataType::Utf8)
101    }
102
103    fn signature(&self) -> Signature {
104        Signature::nullary(Volatility::Immutable)
105    }
106
107    fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
108        let user = func_ctx.query_ctx.current_user();
109
110        Ok(Arc::new(StringVector::from_slice(&[user.username()])) as _)
111    }
112}
113
114impl Function for ReadPreferenceFunction {
115    fn name(&self) -> &str {
116        READ_PREFERENCE_FUNCTION_NAME
117    }
118
119    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
120        Ok(DataType::Utf8)
121    }
122
123    fn signature(&self) -> Signature {
124        Signature::nullary(Volatility::Immutable)
125    }
126
127    fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
128        let read_preference = func_ctx.query_ctx.read_preference();
129
130        Ok(Arc::new(StringVector::from_slice(&[read_preference.as_ref()])) as _)
131    }
132}
133
134impl Function for PgBackendPidFunction {
135    fn name(&self) -> &str {
136        PG_BACKEND_PID
137    }
138
139    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
140        Ok(DataType::UInt64)
141    }
142
143    fn signature(&self) -> Signature {
144        Signature::nullary(Volatility::Immutable)
145    }
146
147    fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
148        let pid = func_ctx.query_ctx.process_id();
149
150        Ok(Arc::new(UInt32Vector::from_slice([pid])) as _)
151    }
152}
153
154impl Function for ConnectionIdFunction {
155    fn name(&self) -> &str {
156        CONNECTION_ID
157    }
158
159    fn return_type(&self, _: &[DataType]) -> Result<DataType> {
160        Ok(DataType::UInt64)
161    }
162
163    fn signature(&self) -> Signature {
164        Signature::nullary(Volatility::Immutable)
165    }
166
167    fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
168        let pid = func_ctx.query_ctx.process_id();
169
170        Ok(Arc::new(UInt32Vector::from_slice([pid])) as _)
171    }
172}
173
174impl fmt::Display for DatabaseFunction {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        write!(f, "DATABASE")
177    }
178}
179
180impl fmt::Display for CurrentSchemaFunction {
181    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182        write!(f, "CURRENT_SCHEMA")
183    }
184}
185
186impl fmt::Display for SessionUserFunction {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        write!(f, "SESSION_USER")
189    }
190}
191
192impl fmt::Display for ReadPreferenceFunction {
193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194        write!(f, "READ_PREFERENCE")
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use std::sync::Arc;
201
202    use session::context::QueryContextBuilder;
203
204    use super::*;
205    #[test]
206    fn test_build_function() {
207        let build = DatabaseFunction;
208        assert_eq!("database", build.name());
209        assert_eq!(DataType::Utf8, build.return_type(&[]).unwrap());
210        assert_eq!(build.signature(), Signature::nullary(Volatility::Immutable));
211
212        let query_ctx = QueryContextBuilder::default()
213            .current_schema("test_db".to_string())
214            .build()
215            .into();
216
217        let func_ctx = FunctionContext {
218            query_ctx,
219            ..Default::default()
220        };
221        let vector = build.eval(&func_ctx, &[]).unwrap();
222        let expect: VectorRef = Arc::new(StringVector::from(vec!["test_db"]));
223        assert_eq!(expect, vector);
224    }
225}