common_function/system/
database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self};
16use std::sync::Arc;
17
18use common_query::error::Result;
19use common_query::prelude::{Signature, Volatility};
20use datatypes::prelude::{ConcreteDataType, ScalarVector};
21use datatypes::vectors::{StringVector, UInt32Vector, VectorRef};
22use derive_more::Display;
23
24use crate::function::{Function, FunctionContext};
25
26/// A function to return current schema name.
27#[derive(Clone, Debug, Default)]
28pub struct DatabaseFunction;
29
30#[derive(Clone, Debug, Default)]
31pub struct CurrentSchemaFunction;
32pub struct SessionUserFunction;
33
34pub struct ReadPreferenceFunction;
35
36#[derive(Display)]
37#[display("{}", self.name())]
38pub struct PgBackendPidFunction;
39
40#[derive(Display)]
41#[display("{}", self.name())]
42pub struct ConnectionIdFunction;
43
44const DATABASE_FUNCTION_NAME: &str = "database";
45const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
46const SESSION_USER_FUNCTION_NAME: &str = "session_user";
47const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
48const PG_BACKEND_PID: &str = "pg_backend_pid";
49const CONNECTION_ID: &str = "connection_id";
50
51impl Function for DatabaseFunction {
52    fn name(&self) -> &str {
53        DATABASE_FUNCTION_NAME
54    }
55
56    fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
57        Ok(ConcreteDataType::string_datatype())
58    }
59
60    fn signature(&self) -> Signature {
61        Signature::nullary(Volatility::Immutable)
62    }
63
64    fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
65        let db = func_ctx.query_ctx.current_schema();
66
67        Ok(Arc::new(StringVector::from_slice(&[&db])) as _)
68    }
69}
70
71impl Function for CurrentSchemaFunction {
72    fn name(&self) -> &str {
73        CURRENT_SCHEMA_FUNCTION_NAME
74    }
75
76    fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
77        Ok(ConcreteDataType::string_datatype())
78    }
79
80    fn signature(&self) -> Signature {
81        Signature::uniform(0, vec![], Volatility::Immutable)
82    }
83
84    fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
85        let db = func_ctx.query_ctx.current_schema();
86
87        Ok(Arc::new(StringVector::from_slice(&[&db])) as _)
88    }
89}
90
91impl Function for SessionUserFunction {
92    fn name(&self) -> &str {
93        SESSION_USER_FUNCTION_NAME
94    }
95
96    fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
97        Ok(ConcreteDataType::string_datatype())
98    }
99
100    fn signature(&self) -> Signature {
101        Signature::uniform(0, vec![], Volatility::Immutable)
102    }
103
104    fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
105        let user = func_ctx.query_ctx.current_user();
106
107        Ok(Arc::new(StringVector::from_slice(&[user.username()])) as _)
108    }
109}
110
111impl Function for ReadPreferenceFunction {
112    fn name(&self) -> &str {
113        READ_PREFERENCE_FUNCTION_NAME
114    }
115
116    fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
117        Ok(ConcreteDataType::string_datatype())
118    }
119
120    fn signature(&self) -> Signature {
121        Signature::nullary(Volatility::Immutable)
122    }
123
124    fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
125        let read_preference = func_ctx.query_ctx.read_preference();
126
127        Ok(Arc::new(StringVector::from_slice(&[read_preference.as_ref()])) as _)
128    }
129}
130
131impl Function for PgBackendPidFunction {
132    fn name(&self) -> &str {
133        PG_BACKEND_PID
134    }
135
136    fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
137        Ok(ConcreteDataType::uint64_datatype())
138    }
139
140    fn signature(&self) -> Signature {
141        Signature::nullary(Volatility::Immutable)
142    }
143
144    fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
145        let pid = func_ctx.query_ctx.process_id();
146
147        Ok(Arc::new(UInt32Vector::from_slice([pid])) as _)
148    }
149}
150
151impl Function for ConnectionIdFunction {
152    fn name(&self) -> &str {
153        CONNECTION_ID
154    }
155
156    fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
157        Ok(ConcreteDataType::uint64_datatype())
158    }
159
160    fn signature(&self) -> Signature {
161        Signature::nullary(Volatility::Immutable)
162    }
163
164    fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
165        let pid = func_ctx.query_ctx.process_id();
166
167        Ok(Arc::new(UInt32Vector::from_slice([pid])) as _)
168    }
169}
170
171impl fmt::Display for DatabaseFunction {
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        write!(f, "DATABASE")
174    }
175}
176
177impl fmt::Display for CurrentSchemaFunction {
178    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179        write!(f, "CURRENT_SCHEMA")
180    }
181}
182
183impl fmt::Display for SessionUserFunction {
184    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185        write!(f, "SESSION_USER")
186    }
187}
188
189impl fmt::Display for ReadPreferenceFunction {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        write!(f, "READ_PREFERENCE")
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use std::sync::Arc;
198
199    use session::context::QueryContextBuilder;
200
201    use super::*;
202    #[test]
203    fn test_build_function() {
204        let build = DatabaseFunction;
205        assert_eq!("database", build.name());
206        assert_eq!(
207            ConcreteDataType::string_datatype(),
208            build.return_type(&[]).unwrap()
209        );
210        assert_eq!(build.signature(), Signature::nullary(Volatility::Immutable));
211
212        let query_ctx = QueryContextBuilder::default()
213            .current_schema("test_db".to_string())
214            .build()
215            .into();
216
217        let func_ctx = FunctionContext {
218            query_ctx,
219            ..Default::default()
220        };
221        let vector = build.eval(&func_ctx, &[]).unwrap();
222        let expect: VectorRef = Arc::new(StringVector::from(vec!["test_db"]));
223        assert_eq!(expect, vector);
224    }
225}