common_function/system/
database.rs1use std::fmt::{self};
16use std::sync::Arc;
17
18use common_query::error::Result;
19use datafusion::arrow::datatypes::DataType;
20use datafusion_expr::{Signature, Volatility};
21use datatypes::prelude::ScalarVector;
22use datatypes::vectors::{StringVector, UInt32Vector, VectorRef};
23use derive_more::Display;
24
25use crate::function::{Function, FunctionContext};
26
27#[derive(Clone, Debug, Default)]
29pub struct DatabaseFunction;
30
31#[derive(Clone, Debug, Default)]
32pub struct CurrentSchemaFunction;
33pub struct SessionUserFunction;
34
35pub struct ReadPreferenceFunction;
36
37#[derive(Display)]
38#[display("{}", self.name())]
39pub struct PgBackendPidFunction;
40
41#[derive(Display)]
42#[display("{}", self.name())]
43pub struct ConnectionIdFunction;
44
45const DATABASE_FUNCTION_NAME: &str = "database";
46const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
47const SESSION_USER_FUNCTION_NAME: &str = "session_user";
48const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
49const PG_BACKEND_PID: &str = "pg_backend_pid";
50const CONNECTION_ID: &str = "connection_id";
51
52impl Function for DatabaseFunction {
53 fn name(&self) -> &str {
54 DATABASE_FUNCTION_NAME
55 }
56
57 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
58 Ok(DataType::Utf8)
59 }
60
61 fn signature(&self) -> Signature {
62 Signature::nullary(Volatility::Immutable)
63 }
64
65 fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
66 let db = func_ctx.query_ctx.current_schema();
67
68 Ok(Arc::new(StringVector::from_slice(&[&db])) as _)
69 }
70}
71
72impl Function for CurrentSchemaFunction {
75 fn name(&self) -> &str {
76 CURRENT_SCHEMA_FUNCTION_NAME
77 }
78
79 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
80 Ok(DataType::Utf8)
81 }
82
83 fn signature(&self) -> Signature {
84 Signature::nullary(Volatility::Immutable)
85 }
86
87 fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
88 let db = func_ctx.query_ctx.current_schema();
89
90 Ok(Arc::new(StringVector::from_slice(&[&db])) as _)
91 }
92}
93
94impl Function for SessionUserFunction {
95 fn name(&self) -> &str {
96 SESSION_USER_FUNCTION_NAME
97 }
98
99 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
100 Ok(DataType::Utf8)
101 }
102
103 fn signature(&self) -> Signature {
104 Signature::nullary(Volatility::Immutable)
105 }
106
107 fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
108 let user = func_ctx.query_ctx.current_user();
109
110 Ok(Arc::new(StringVector::from_slice(&[user.username()])) as _)
111 }
112}
113
114impl Function for ReadPreferenceFunction {
115 fn name(&self) -> &str {
116 READ_PREFERENCE_FUNCTION_NAME
117 }
118
119 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
120 Ok(DataType::Utf8)
121 }
122
123 fn signature(&self) -> Signature {
124 Signature::nullary(Volatility::Immutable)
125 }
126
127 fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
128 let read_preference = func_ctx.query_ctx.read_preference();
129
130 Ok(Arc::new(StringVector::from_slice(&[read_preference.as_ref()])) as _)
131 }
132}
133
134impl Function for PgBackendPidFunction {
135 fn name(&self) -> &str {
136 PG_BACKEND_PID
137 }
138
139 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
140 Ok(DataType::UInt64)
141 }
142
143 fn signature(&self) -> Signature {
144 Signature::nullary(Volatility::Immutable)
145 }
146
147 fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
148 let pid = func_ctx.query_ctx.process_id();
149
150 Ok(Arc::new(UInt32Vector::from_slice([pid])) as _)
151 }
152}
153
154impl Function for ConnectionIdFunction {
155 fn name(&self) -> &str {
156 CONNECTION_ID
157 }
158
159 fn return_type(&self, _: &[DataType]) -> Result<DataType> {
160 Ok(DataType::UInt64)
161 }
162
163 fn signature(&self) -> Signature {
164 Signature::nullary(Volatility::Immutable)
165 }
166
167 fn eval(&self, func_ctx: &FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
168 let pid = func_ctx.query_ctx.process_id();
169
170 Ok(Arc::new(UInt32Vector::from_slice([pid])) as _)
171 }
172}
173
174impl fmt::Display for DatabaseFunction {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 write!(f, "DATABASE")
177 }
178}
179
180impl fmt::Display for CurrentSchemaFunction {
181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182 write!(f, "CURRENT_SCHEMA")
183 }
184}
185
186impl fmt::Display for SessionUserFunction {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 write!(f, "SESSION_USER")
189 }
190}
191
192impl fmt::Display for ReadPreferenceFunction {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 write!(f, "READ_PREFERENCE")
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use std::sync::Arc;
201
202 use session::context::QueryContextBuilder;
203
204 use super::*;
205 #[test]
206 fn test_build_function() {
207 let build = DatabaseFunction;
208 assert_eq!("database", build.name());
209 assert_eq!(DataType::Utf8, build.return_type(&[]).unwrap());
210 assert_eq!(build.signature(), Signature::nullary(Volatility::Immutable));
211
212 let query_ctx = QueryContextBuilder::default()
213 .current_schema("test_db".to_string())
214 .build()
215 .into();
216
217 let func_ctx = FunctionContext {
218 query_ctx,
219 ..Default::default()
220 };
221 let vector = build.eval(&func_ctx, &[]).unwrap();
222 let expect: VectorRef = Arc::new(StringVector::from(vec!["test_db"]));
223 assert_eq!(expect, vector);
224 }
225}