1use std::sync::Arc;
16
17use common_catalog::consts::{
18 DEFAULT_PRIVATE_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
19};
20use datafusion::arrow::array::{ArrayRef, StringArray, StringBuilder, as_boolean_array};
21use datafusion::catalog::TableFunction;
22use datafusion::common::ScalarValue;
23use datafusion::common::utils::SingleRowListArrayBuilder;
24use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility};
25use datafusion_pg_catalog::pg_catalog::{self, PgCatalogStaticTables};
26use datatypes::arrow::datatypes::{DataType, Field};
27use derive_more::derive::Display;
28
29use crate::function::{Function, find_function_context};
30use crate::function_registry::FunctionRegistry;
31use crate::system::define_nullary_udf;
32
33const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
34const CURRENT_SCHEMAS_FUNCTION_NAME: &str = "current_schemas";
35const SESSION_USER_FUNCTION_NAME: &str = "session_user";
36const CURRENT_DATABASE_FUNCTION_NAME: &str = "current_database";
37const OBJ_DESCRIPTION_FUNCTION_NAME: &str = "obj_description";
38const COL_DESCRIPTION_FUNCTION_NAME: &str = "col_description";
39const SHOBJ_DESCRIPTION_FUNCTION_NAME: &str = "shobj_description";
40const PG_MY_TEMP_SCHEMA_FUNCTION_NAME: &str = "pg_my_temp_schema";
41
42define_nullary_udf!(CurrentSchemaFunction);
43define_nullary_udf!(SessionUserFunction);
44define_nullary_udf!(CurrentDatabaseFunction);
45define_nullary_udf!(PgMyTempSchemaFunction);
46
47impl Function for CurrentDatabaseFunction {
48 fn name(&self) -> &str {
49 CURRENT_DATABASE_FUNCTION_NAME
50 }
51
52 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
53 Ok(DataType::Utf8View)
54 }
55
56 fn signature(&self) -> &Signature {
57 &self.signature
58 }
59
60 fn invoke_with_args(
61 &self,
62 args: ScalarFunctionArgs,
63 ) -> datafusion_common::Result<ColumnarValue> {
64 let func_ctx = find_function_context(&args)?;
65 let db = func_ctx.query_ctx.current_catalog().to_string();
66
67 Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
68 }
69}
70
71impl Function for CurrentSchemaFunction {
74 fn name(&self) -> &str {
75 CURRENT_SCHEMA_FUNCTION_NAME
76 }
77
78 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
79 Ok(DataType::Utf8View)
80 }
81
82 fn signature(&self) -> &Signature {
83 &self.signature
84 }
85
86 fn invoke_with_args(
87 &self,
88 args: ScalarFunctionArgs,
89 ) -> datafusion_common::Result<ColumnarValue> {
90 let func_ctx = find_function_context(&args)?;
91 let db = func_ctx.query_ctx.current_schema();
92
93 Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
94 }
95}
96
97impl Function for SessionUserFunction {
98 fn name(&self) -> &str {
99 SESSION_USER_FUNCTION_NAME
100 }
101
102 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
103 Ok(DataType::Utf8View)
104 }
105
106 fn signature(&self) -> &Signature {
107 &self.signature
108 }
109
110 fn invoke_with_args(
111 &self,
112 args: ScalarFunctionArgs,
113 ) -> datafusion_common::Result<ColumnarValue> {
114 let func_ctx = find_function_context(&args)?;
115 let user = func_ctx.query_ctx.current_user();
116
117 Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
118 user.username().to_string(),
119 ))))
120 }
121}
122
123#[derive(Display, Debug)]
124#[display("{}", self.name())]
125pub(super) struct CurrentSchemasFunction {
126 signature: Signature,
127}
128
129impl CurrentSchemasFunction {
130 pub fn new() -> Self {
131 Self {
132 signature: Signature::new(
133 TypeSignature::Exact(vec![DataType::Boolean]),
134 Volatility::Stable,
135 ),
136 }
137 }
138}
139
140impl Function for CurrentSchemasFunction {
141 fn name(&self) -> &str {
142 CURRENT_SCHEMAS_FUNCTION_NAME
143 }
144
145 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
146 Ok(DataType::List(Arc::new(Field::new(
147 "item",
148 DataType::Utf8,
149 true,
150 ))))
151 }
152
153 fn signature(&self) -> &Signature {
154 &self.signature
155 }
156
157 fn invoke_with_args(
158 &self,
159 args: ScalarFunctionArgs,
160 ) -> datafusion_common::Result<ColumnarValue> {
161 let args = ColumnarValue::values_to_arrays(&args.args)?;
162 let input = as_boolean_array(&args[0]);
163
164 let mut values = vec!["public"];
166 if input.value(0) {
168 values.push(INFORMATION_SCHEMA_NAME);
169 values.push(PG_CATALOG_NAME);
170 values.push(DEFAULT_PRIVATE_SCHEMA_NAME);
171 }
172
173 let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
174
175 let array: ArrayRef = Arc::new(list_array.build_list_array());
176
177 Ok(ColumnarValue::Array(array))
178 }
179}
180
181#[derive(Display, Debug, Clone)]
183#[display("{}", self.name())]
184pub(super) struct ObjDescriptionFunction {
185 signature: Signature,
186}
187
188impl ObjDescriptionFunction {
189 pub fn new() -> Self {
190 Self {
191 signature: Signature::one_of(
192 vec![
193 TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
194 TypeSignature::Exact(vec![DataType::UInt32, DataType::Utf8]),
195 TypeSignature::Exact(vec![DataType::Int64]),
196 TypeSignature::Exact(vec![DataType::UInt32]),
197 ],
198 Volatility::Stable,
199 ),
200 }
201 }
202}
203
204impl Function for ObjDescriptionFunction {
205 fn name(&self) -> &str {
206 OBJ_DESCRIPTION_FUNCTION_NAME
207 }
208
209 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
210 Ok(DataType::Utf8)
211 }
212
213 fn signature(&self) -> &Signature {
214 &self.signature
215 }
216
217 fn invoke_with_args(
218 &self,
219 args: ScalarFunctionArgs,
220 ) -> datafusion_common::Result<ColumnarValue> {
221 let num_rows = args.number_rows;
222 let mut builder = StringBuilder::with_capacity(num_rows, 0);
223 for _ in 0..num_rows {
224 builder.append_null();
225 }
226 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
227 }
228}
229
230#[derive(Display, Debug, Clone)]
232#[display("{}", self.name())]
233pub(super) struct ColDescriptionFunction {
234 signature: Signature,
235}
236
237impl ColDescriptionFunction {
238 pub fn new() -> Self {
239 Self {
240 signature: Signature::one_of(
241 vec![
242 TypeSignature::Exact(vec![DataType::Int64, DataType::Int32]),
243 TypeSignature::Exact(vec![DataType::UInt32, DataType::Int32]),
244 TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]),
245 TypeSignature::Exact(vec![DataType::UInt32, DataType::Int64]),
246 ],
247 Volatility::Stable,
248 ),
249 }
250 }
251}
252
253impl Function for ColDescriptionFunction {
254 fn name(&self) -> &str {
255 COL_DESCRIPTION_FUNCTION_NAME
256 }
257
258 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
259 Ok(DataType::Utf8)
260 }
261
262 fn signature(&self) -> &Signature {
263 &self.signature
264 }
265
266 fn invoke_with_args(
267 &self,
268 args: ScalarFunctionArgs,
269 ) -> datafusion_common::Result<ColumnarValue> {
270 let num_rows = args.number_rows;
271 let mut builder = StringBuilder::with_capacity(num_rows, 0);
272 for _ in 0..num_rows {
273 builder.append_null();
274 }
275 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
276 }
277}
278
279#[derive(Display, Debug, Clone)]
281#[display("{}", self.name())]
282pub(super) struct ShobjDescriptionFunction {
283 signature: Signature,
284}
285
286impl ShobjDescriptionFunction {
287 pub fn new() -> Self {
288 Self {
289 signature: Signature::one_of(
290 vec![
291 TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
292 TypeSignature::Exact(vec![DataType::UInt64, DataType::Utf8]),
293 TypeSignature::Exact(vec![DataType::Int32, DataType::Utf8]),
294 TypeSignature::Exact(vec![DataType::UInt32, DataType::Utf8]),
295 ],
296 Volatility::Stable,
297 ),
298 }
299 }
300}
301
302impl Function for ShobjDescriptionFunction {
303 fn name(&self) -> &str {
304 SHOBJ_DESCRIPTION_FUNCTION_NAME
305 }
306
307 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
308 Ok(DataType::Utf8)
309 }
310
311 fn signature(&self) -> &Signature {
312 &self.signature
313 }
314
315 fn invoke_with_args(
316 &self,
317 args: ScalarFunctionArgs,
318 ) -> datafusion_common::Result<ColumnarValue> {
319 let num_rows = args.number_rows;
320 let mut builder = StringBuilder::with_capacity(num_rows, 0);
321 for _ in 0..num_rows {
322 builder.append_null();
323 }
324 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
325 }
326}
327
328impl Function for PgMyTempSchemaFunction {
330 fn name(&self) -> &str {
331 PG_MY_TEMP_SCHEMA_FUNCTION_NAME
332 }
333
334 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
335 Ok(DataType::UInt32)
336 }
337
338 fn signature(&self) -> &Signature {
339 &self.signature
340 }
341
342 fn invoke_with_args(
343 &self,
344 _args: ScalarFunctionArgs,
345 ) -> datafusion_common::Result<ColumnarValue> {
346 Ok(ColumnarValue::Scalar(ScalarValue::UInt32(Some(0))))
347 }
348}
349
350pub(super) struct PGCatalogFunction;
351
352impl PGCatalogFunction {
353 pub fn register(registry: &FunctionRegistry) {
354 let static_tables =
355 Arc::new(PgCatalogStaticTables::try_new().expect("load postgres static tables"));
356
357 registry.register_scalar(CurrentSchemaFunction::default());
358 registry.register_scalar(CurrentSchemasFunction::new());
359 registry.register_scalar(SessionUserFunction::default());
360 registry.register_scalar(CurrentDatabaseFunction::default());
361 registry.register(pg_catalog::format_type::create_format_type_udf());
362 registry.register(pg_catalog::create_pg_get_partkeydef_udf());
363 registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
364 "has_table_privilege",
365 ));
366 registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
367 "has_schema_privilege",
368 ));
369 registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
370 "has_database_privilege",
371 ));
372 registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
373 "has_any_column_privilege",
374 ));
375 registry.register_table_function(TableFunction::new(
376 "pg_get_keywords".to_string(),
377 static_tables.pg_get_keywords.clone(),
378 ));
379 registry.register(pg_catalog::create_pg_relation_is_publishable_udf());
380 registry.register(pg_catalog::create_pg_get_statisticsobjdef_columns_udf());
381 registry.register(pg_catalog::create_pg_get_userbyid_udf());
382 registry.register(pg_catalog::create_pg_table_is_visible());
383 registry.register(pg_catalog::pg_get_expr_udf::create_pg_get_expr_udf());
384 registry.register(pg_catalog::create_pg_encoding_to_char_udf());
385 registry.register(pg_catalog::create_pg_relation_size_udf());
386 registry.register(pg_catalog::create_pg_total_relation_size_udf());
387 registry.register(pg_catalog::create_pg_stat_get_numscans());
388 registry.register(pg_catalog::create_pg_get_constraintdef());
389 registry.register(pg_catalog::create_pg_get_partition_ancestors_udf());
390 registry.register_scalar(ObjDescriptionFunction::new());
391 registry.register_scalar(ColDescriptionFunction::new());
392 registry.register_scalar(ShobjDescriptionFunction::new());
393 registry.register_scalar(PgMyTempSchemaFunction::default());
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use std::sync::Arc;
400
401 use arrow_schema::Field;
402 use datafusion::arrow::array::Array;
403 use datafusion_common::ScalarValue;
404 use datafusion_expr::ColumnarValue;
405
406 use super::*;
407
408 fn create_test_args(args: Vec<ColumnarValue>, number_rows: usize) -> ScalarFunctionArgs {
409 ScalarFunctionArgs {
410 args,
411 arg_fields: vec![],
412 number_rows,
413 return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
414 config_options: Arc::new(Default::default()),
415 }
416 }
417
418 #[test]
419 fn test_obj_description_function() {
420 let func = ObjDescriptionFunction::new();
421 assert_eq!("obj_description", func.name());
422 assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
423
424 let args = create_test_args(
425 vec![
426 ColumnarValue::Scalar(ScalarValue::Int64(Some(1234))),
427 ColumnarValue::Scalar(ScalarValue::Utf8(Some("pg_class".to_string()))),
428 ],
429 1,
430 );
431 let result = func.invoke_with_args(args).unwrap();
432 if let ColumnarValue::Array(arr) = result {
433 assert_eq!(1, arr.len());
434 assert!(arr.is_null(0));
435 } else {
436 panic!("Expected Array result");
437 }
438 }
439
440 #[test]
441 fn test_col_description_function() {
442 let func = ColDescriptionFunction::new();
443 assert_eq!("col_description", func.name());
444 assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
445
446 let args = create_test_args(
447 vec![
448 ColumnarValue::Scalar(ScalarValue::Int64(Some(1234))),
449 ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
450 ],
451 1,
452 );
453 let result = func.invoke_with_args(args).unwrap();
454 if let ColumnarValue::Array(arr) = result {
455 assert_eq!(1, arr.len());
456 assert!(arr.is_null(0));
457 } else {
458 panic!("Expected Array result");
459 }
460 }
461
462 #[test]
463 fn test_shobj_description_function() {
464 let func = ShobjDescriptionFunction::new();
465 assert_eq!("shobj_description", func.name());
466 assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
467
468 let args = create_test_args(
469 vec![
470 ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
471 ColumnarValue::Scalar(ScalarValue::Utf8(Some("pg_database".to_string()))),
472 ],
473 1,
474 );
475 let result = func.invoke_with_args(args).unwrap();
476 if let ColumnarValue::Array(arr) = result {
477 assert_eq!(1, arr.len());
478 assert!(arr.is_null(0));
479 } else {
480 panic!("Expected Array result");
481 }
482 }
483}