common_function/system/
database.rs1use std::fmt::{self};
16use std::sync::Arc;
17
18use common_query::error::Result;
19use common_query::prelude::{Signature, Volatility};
20use datatypes::prelude::{ConcreteDataType, ScalarVector};
21use datatypes::vectors::{StringVector, UInt32Vector, VectorRef};
22use derive_more::Display;
23
24use crate::function::{Function, FunctionContext};
25
26#[derive(Clone, Debug, Default)]
28pub struct DatabaseFunction;
29
30#[derive(Clone, Debug, Default)]
31pub struct CurrentSchemaFunction;
32pub struct SessionUserFunction;
33
34pub struct ReadPreferenceFunction;
35
36#[derive(Display)]
37#[display("{}", self.name())]
38pub struct PgBackendPidFunction;
39
40#[derive(Display)]
41#[display("{}", self.name())]
42pub struct ConnectionIdFunction;
43
44const DATABASE_FUNCTION_NAME: &str = "database";
45const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
46const SESSION_USER_FUNCTION_NAME: &str = "session_user";
47const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
48const PG_BACKEND_PID: &str = "pg_backend_pid";
49const CONNECTION_ID: &str = "connection_id";
50
51impl Function for DatabaseFunction {
52 fn name(&self) -> &str {
53 DATABASE_FUNCTION_NAME
54 }
55
56 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
57 Ok(ConcreteDataType::string_datatype())
58 }
59
60 fn signature(&self) -> Signature {
61 Signature::nullary(Volatility::Immutable)
62 }
63
64 fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
65 let db = func_ctx.query_ctx.current_schema();
66
67 Ok(Arc::new(StringVector::from_slice(&[&db])) as _)
68 }
69}
70
71impl Function for CurrentSchemaFunction {
72 fn name(&self) -> &str {
73 CURRENT_SCHEMA_FUNCTION_NAME
74 }
75
76 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
77 Ok(ConcreteDataType::string_datatype())
78 }
79
80 fn signature(&self) -> Signature {
81 Signature::uniform(0, vec![], Volatility::Immutable)
82 }
83
84 fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
85 let db = func_ctx.query_ctx.current_schema();
86
87 Ok(Arc::new(StringVector::from_slice(&[&db])) as _)
88 }
89}
90
91impl Function for SessionUserFunction {
92 fn name(&self) -> &str {
93 SESSION_USER_FUNCTION_NAME
94 }
95
96 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
97 Ok(ConcreteDataType::string_datatype())
98 }
99
100 fn signature(&self) -> Signature {
101 Signature::uniform(0, vec![], Volatility::Immutable)
102 }
103
104 fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
105 let user = func_ctx.query_ctx.current_user();
106
107 Ok(Arc::new(StringVector::from_slice(&[user.username()])) as _)
108 }
109}
110
111impl Function for ReadPreferenceFunction {
112 fn name(&self) -> &str {
113 READ_PREFERENCE_FUNCTION_NAME
114 }
115
116 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
117 Ok(ConcreteDataType::string_datatype())
118 }
119
120 fn signature(&self) -> Signature {
121 Signature::nullary(Volatility::Immutable)
122 }
123
124 fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
125 let read_preference = func_ctx.query_ctx.read_preference();
126
127 Ok(Arc::new(StringVector::from_slice(&[read_preference.as_ref()])) as _)
128 }
129}
130
131impl Function for PgBackendPidFunction {
132 fn name(&self) -> &str {
133 PG_BACKEND_PID
134 }
135
136 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
137 Ok(ConcreteDataType::uint64_datatype())
138 }
139
140 fn signature(&self) -> Signature {
141 Signature::nullary(Volatility::Immutable)
142 }
143
144 fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
145 let pid = func_ctx.query_ctx.process_id();
146
147 Ok(Arc::new(UInt32Vector::from_slice([pid])) as _)
148 }
149}
150
151impl Function for ConnectionIdFunction {
152 fn name(&self) -> &str {
153 CONNECTION_ID
154 }
155
156 fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
157 Ok(ConcreteDataType::uint64_datatype())
158 }
159
160 fn signature(&self) -> Signature {
161 Signature::nullary(Volatility::Immutable)
162 }
163
164 fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
165 let pid = func_ctx.query_ctx.process_id();
166
167 Ok(Arc::new(UInt32Vector::from_slice([pid])) as _)
168 }
169}
170
171impl fmt::Display for DatabaseFunction {
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 write!(f, "DATABASE")
174 }
175}
176
177impl fmt::Display for CurrentSchemaFunction {
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 write!(f, "CURRENT_SCHEMA")
180 }
181}
182
183impl fmt::Display for SessionUserFunction {
184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185 write!(f, "SESSION_USER")
186 }
187}
188
189impl fmt::Display for ReadPreferenceFunction {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 write!(f, "READ_PREFERENCE")
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use std::sync::Arc;
198
199 use session::context::QueryContextBuilder;
200
201 use super::*;
202 #[test]
203 fn test_build_function() {
204 let build = DatabaseFunction;
205 assert_eq!("database", build.name());
206 assert_eq!(
207 ConcreteDataType::string_datatype(),
208 build.return_type(&[]).unwrap()
209 );
210 assert_eq!(build.signature(), Signature::nullary(Volatility::Immutable));
211
212 let query_ctx = QueryContextBuilder::default()
213 .current_schema("test_db".to_string())
214 .build()
215 .into();
216
217 let func_ctx = FunctionContext {
218 query_ctx,
219 ..Default::default()
220 };
221 let vector = build.eval(&func_ctx, &[]).unwrap();
222 let expect: VectorRef = Arc::new(StringVector::from(vec!["test_db"]));
223 assert_eq!(expect, vector);
224 }
225}