common_function/system/
pg_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_catalog::consts::{
18    DEFAULT_PRIVATE_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
19};
20use datafusion::arrow::array::{ArrayRef, StringArray, StringBuilder, as_boolean_array};
21use datafusion::catalog::TableFunction;
22use datafusion::common::ScalarValue;
23use datafusion::common::utils::SingleRowListArrayBuilder;
24use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility};
25use datafusion_pg_catalog::pg_catalog::{self, PgCatalogStaticTables};
26use datatypes::arrow::datatypes::{DataType, Field};
27use derive_more::derive::Display;
28
29use crate::function::{Function, find_function_context};
30use crate::function_registry::FunctionRegistry;
31use crate::system::define_nullary_udf;
32
33const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
34const CURRENT_SCHEMAS_FUNCTION_NAME: &str = "current_schemas";
35const SESSION_USER_FUNCTION_NAME: &str = "session_user";
36const CURRENT_DATABASE_FUNCTION_NAME: &str = "current_database";
37const OBJ_DESCRIPTION_FUNCTION_NAME: &str = "obj_description";
38const COL_DESCRIPTION_FUNCTION_NAME: &str = "col_description";
39const SHOBJ_DESCRIPTION_FUNCTION_NAME: &str = "shobj_description";
40const PG_MY_TEMP_SCHEMA_FUNCTION_NAME: &str = "pg_my_temp_schema";
41
42define_nullary_udf!(CurrentSchemaFunction);
43define_nullary_udf!(SessionUserFunction);
44define_nullary_udf!(CurrentDatabaseFunction);
45define_nullary_udf!(PgMyTempSchemaFunction);
46
47impl Function for CurrentDatabaseFunction {
48    fn name(&self) -> &str {
49        CURRENT_DATABASE_FUNCTION_NAME
50    }
51
52    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
53        Ok(DataType::Utf8View)
54    }
55
56    fn signature(&self) -> &Signature {
57        &self.signature
58    }
59
60    fn invoke_with_args(
61        &self,
62        args: ScalarFunctionArgs,
63    ) -> datafusion_common::Result<ColumnarValue> {
64        let func_ctx = find_function_context(&args)?;
65        let db = func_ctx.query_ctx.current_catalog().to_string();
66
67        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
68    }
69}
70
71// Though "current_schema" can be aliased to "database", to not cause any breaking changes,
72// we are not doing it: not until https://github.com/apache/datafusion/issues/17469 is resolved.
73impl Function for CurrentSchemaFunction {
74    fn name(&self) -> &str {
75        CURRENT_SCHEMA_FUNCTION_NAME
76    }
77
78    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
79        Ok(DataType::Utf8View)
80    }
81
82    fn signature(&self) -> &Signature {
83        &self.signature
84    }
85
86    fn invoke_with_args(
87        &self,
88        args: ScalarFunctionArgs,
89    ) -> datafusion_common::Result<ColumnarValue> {
90        let func_ctx = find_function_context(&args)?;
91        let db = func_ctx.query_ctx.current_schema();
92
93        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
94    }
95}
96
97impl Function for SessionUserFunction {
98    fn name(&self) -> &str {
99        SESSION_USER_FUNCTION_NAME
100    }
101
102    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
103        Ok(DataType::Utf8View)
104    }
105
106    fn signature(&self) -> &Signature {
107        &self.signature
108    }
109
110    fn invoke_with_args(
111        &self,
112        args: ScalarFunctionArgs,
113    ) -> datafusion_common::Result<ColumnarValue> {
114        let func_ctx = find_function_context(&args)?;
115        let user = func_ctx.query_ctx.current_user();
116
117        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
118            user.username().to_string(),
119        ))))
120    }
121}
122
123#[derive(Display, Debug)]
124#[display("{}", self.name())]
125pub(super) struct CurrentSchemasFunction {
126    signature: Signature,
127}
128
129impl CurrentSchemasFunction {
130    pub fn new() -> Self {
131        Self {
132            signature: Signature::new(
133                TypeSignature::Exact(vec![DataType::Boolean]),
134                Volatility::Stable,
135            ),
136        }
137    }
138}
139
140impl Function for CurrentSchemasFunction {
141    fn name(&self) -> &str {
142        CURRENT_SCHEMAS_FUNCTION_NAME
143    }
144
145    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
146        Ok(DataType::List(Arc::new(Field::new(
147            "item",
148            DataType::Utf8,
149            true,
150        ))))
151    }
152
153    fn signature(&self) -> &Signature {
154        &self.signature
155    }
156
157    fn invoke_with_args(
158        &self,
159        args: ScalarFunctionArgs,
160    ) -> datafusion_common::Result<ColumnarValue> {
161        let args = ColumnarValue::values_to_arrays(&args.args)?;
162        let input = as_boolean_array(&args[0]);
163
164        // Create a UTF8 array with a single value
165        let mut values = vec!["public"];
166        // include implicit schemas
167        if input.value(0) {
168            values.push(INFORMATION_SCHEMA_NAME);
169            values.push(PG_CATALOG_NAME);
170            values.push(DEFAULT_PRIVATE_SCHEMA_NAME);
171        }
172
173        let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
174
175        let array: ArrayRef = Arc::new(list_array.build_list_array());
176
177        Ok(ColumnarValue::Array(array))
178    }
179}
180
181/// PostgreSQL obj_description - returns NULL for compatibility
182#[derive(Display, Debug, Clone)]
183#[display("{}", self.name())]
184pub(super) struct ObjDescriptionFunction {
185    signature: Signature,
186}
187
188impl ObjDescriptionFunction {
189    pub fn new() -> Self {
190        Self {
191            signature: Signature::one_of(
192                vec![
193                    TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
194                    TypeSignature::Exact(vec![DataType::UInt32, DataType::Utf8]),
195                    TypeSignature::Exact(vec![DataType::Int64]),
196                    TypeSignature::Exact(vec![DataType::UInt32]),
197                ],
198                Volatility::Stable,
199            ),
200        }
201    }
202}
203
204impl Function for ObjDescriptionFunction {
205    fn name(&self) -> &str {
206        OBJ_DESCRIPTION_FUNCTION_NAME
207    }
208
209    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
210        Ok(DataType::Utf8)
211    }
212
213    fn signature(&self) -> &Signature {
214        &self.signature
215    }
216
217    fn invoke_with_args(
218        &self,
219        args: ScalarFunctionArgs,
220    ) -> datafusion_common::Result<ColumnarValue> {
221        let num_rows = args.number_rows;
222        let mut builder = StringBuilder::with_capacity(num_rows, 0);
223        for _ in 0..num_rows {
224            builder.append_null();
225        }
226        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
227    }
228}
229
230/// PostgreSQL col_description - returns NULL for compatibility
231#[derive(Display, Debug, Clone)]
232#[display("{}", self.name())]
233pub(super) struct ColDescriptionFunction {
234    signature: Signature,
235}
236
237impl ColDescriptionFunction {
238    pub fn new() -> Self {
239        Self {
240            signature: Signature::one_of(
241                vec![
242                    TypeSignature::Exact(vec![DataType::Int64, DataType::Int32]),
243                    TypeSignature::Exact(vec![DataType::UInt32, DataType::Int32]),
244                    TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]),
245                    TypeSignature::Exact(vec![DataType::UInt32, DataType::Int64]),
246                ],
247                Volatility::Stable,
248            ),
249        }
250    }
251}
252
253impl Function for ColDescriptionFunction {
254    fn name(&self) -> &str {
255        COL_DESCRIPTION_FUNCTION_NAME
256    }
257
258    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
259        Ok(DataType::Utf8)
260    }
261
262    fn signature(&self) -> &Signature {
263        &self.signature
264    }
265
266    fn invoke_with_args(
267        &self,
268        args: ScalarFunctionArgs,
269    ) -> datafusion_common::Result<ColumnarValue> {
270        let num_rows = args.number_rows;
271        let mut builder = StringBuilder::with_capacity(num_rows, 0);
272        for _ in 0..num_rows {
273            builder.append_null();
274        }
275        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
276    }
277}
278
279/// PostgreSQL shobj_description - returns NULL for compatibility
280#[derive(Display, Debug, Clone)]
281#[display("{}", self.name())]
282pub(super) struct ShobjDescriptionFunction {
283    signature: Signature,
284}
285
286impl ShobjDescriptionFunction {
287    pub fn new() -> Self {
288        Self {
289            signature: Signature::one_of(
290                vec![
291                    TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
292                    TypeSignature::Exact(vec![DataType::UInt64, DataType::Utf8]),
293                    TypeSignature::Exact(vec![DataType::Int32, DataType::Utf8]),
294                    TypeSignature::Exact(vec![DataType::UInt32, DataType::Utf8]),
295                ],
296                Volatility::Stable,
297            ),
298        }
299    }
300}
301
302impl Function for ShobjDescriptionFunction {
303    fn name(&self) -> &str {
304        SHOBJ_DESCRIPTION_FUNCTION_NAME
305    }
306
307    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
308        Ok(DataType::Utf8)
309    }
310
311    fn signature(&self) -> &Signature {
312        &self.signature
313    }
314
315    fn invoke_with_args(
316        &self,
317        args: ScalarFunctionArgs,
318    ) -> datafusion_common::Result<ColumnarValue> {
319        let num_rows = args.number_rows;
320        let mut builder = StringBuilder::with_capacity(num_rows, 0);
321        for _ in 0..num_rows {
322            builder.append_null();
323        }
324        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
325    }
326}
327
328/// PostgreSQL pg_my_temp_schema - returns 0 (no temp schema) for compatibility
329impl Function for PgMyTempSchemaFunction {
330    fn name(&self) -> &str {
331        PG_MY_TEMP_SCHEMA_FUNCTION_NAME
332    }
333
334    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
335        Ok(DataType::UInt32)
336    }
337
338    fn signature(&self) -> &Signature {
339        &self.signature
340    }
341
342    fn invoke_with_args(
343        &self,
344        _args: ScalarFunctionArgs,
345    ) -> datafusion_common::Result<ColumnarValue> {
346        Ok(ColumnarValue::Scalar(ScalarValue::UInt32(Some(0))))
347    }
348}
349
350pub(super) struct PGCatalogFunction;
351
352impl PGCatalogFunction {
353    pub fn register(registry: &FunctionRegistry) {
354        let static_tables =
355            Arc::new(PgCatalogStaticTables::try_new().expect("load postgres static tables"));
356
357        registry.register_scalar(CurrentSchemaFunction::default());
358        registry.register_scalar(CurrentSchemasFunction::new());
359        registry.register_scalar(SessionUserFunction::default());
360        registry.register_scalar(CurrentDatabaseFunction::default());
361        registry.register(pg_catalog::format_type::create_format_type_udf());
362        registry.register(pg_catalog::create_pg_get_partkeydef_udf());
363        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
364            "has_table_privilege",
365        ));
366        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
367            "has_schema_privilege",
368        ));
369        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
370            "has_database_privilege",
371        ));
372        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
373            "has_any_column_privilege",
374        ));
375        registry.register_table_function(TableFunction::new(
376            "pg_get_keywords".to_string(),
377            static_tables.pg_get_keywords.clone(),
378        ));
379        registry.register(pg_catalog::create_pg_relation_is_publishable_udf());
380        registry.register(pg_catalog::create_pg_get_statisticsobjdef_columns_udf());
381        registry.register(pg_catalog::create_pg_get_userbyid_udf());
382        registry.register(pg_catalog::create_pg_table_is_visible());
383        registry.register(pg_catalog::pg_get_expr_udf::create_pg_get_expr_udf());
384        registry.register(pg_catalog::create_pg_encoding_to_char_udf());
385        registry.register(pg_catalog::create_pg_relation_size_udf());
386        registry.register(pg_catalog::create_pg_total_relation_size_udf());
387        registry.register(pg_catalog::create_pg_stat_get_numscans());
388        registry.register(pg_catalog::create_pg_get_constraintdef());
389        registry.register(pg_catalog::create_pg_get_partition_ancestors_udf());
390        registry.register_scalar(ObjDescriptionFunction::new());
391        registry.register_scalar(ColDescriptionFunction::new());
392        registry.register_scalar(ShobjDescriptionFunction::new());
393        registry.register_scalar(PgMyTempSchemaFunction::default());
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use std::sync::Arc;
400
401    use arrow_schema::Field;
402    use datafusion::arrow::array::Array;
403    use datafusion_common::ScalarValue;
404    use datafusion_expr::ColumnarValue;
405
406    use super::*;
407
408    fn create_test_args(args: Vec<ColumnarValue>, number_rows: usize) -> ScalarFunctionArgs {
409        ScalarFunctionArgs {
410            args,
411            arg_fields: vec![],
412            number_rows,
413            return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
414            config_options: Arc::new(Default::default()),
415        }
416    }
417
418    #[test]
419    fn test_obj_description_function() {
420        let func = ObjDescriptionFunction::new();
421        assert_eq!("obj_description", func.name());
422        assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
423
424        let args = create_test_args(
425            vec![
426                ColumnarValue::Scalar(ScalarValue::Int64(Some(1234))),
427                ColumnarValue::Scalar(ScalarValue::Utf8(Some("pg_class".to_string()))),
428            ],
429            1,
430        );
431        let result = func.invoke_with_args(args).unwrap();
432        if let ColumnarValue::Array(arr) = result {
433            assert_eq!(1, arr.len());
434            assert!(arr.is_null(0));
435        } else {
436            panic!("Expected Array result");
437        }
438    }
439
440    #[test]
441    fn test_col_description_function() {
442        let func = ColDescriptionFunction::new();
443        assert_eq!("col_description", func.name());
444        assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
445
446        let args = create_test_args(
447            vec![
448                ColumnarValue::Scalar(ScalarValue::Int64(Some(1234))),
449                ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
450            ],
451            1,
452        );
453        let result = func.invoke_with_args(args).unwrap();
454        if let ColumnarValue::Array(arr) = result {
455            assert_eq!(1, arr.len());
456            assert!(arr.is_null(0));
457        } else {
458            panic!("Expected Array result");
459        }
460    }
461
462    #[test]
463    fn test_shobj_description_function() {
464        let func = ShobjDescriptionFunction::new();
465        assert_eq!("shobj_description", func.name());
466        assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
467
468        let args = create_test_args(
469            vec![
470                ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
471                ColumnarValue::Scalar(ScalarValue::Utf8(Some("pg_database".to_string()))),
472            ],
473            1,
474        );
475        let result = func.invoke_with_args(args).unwrap();
476        if let ColumnarValue::Array(arr) = result {
477            assert_eq!(1, arr.len());
478            assert!(arr.is_null(0));
479        } else {
480            panic!("Expected Array result");
481        }
482    }
483}