common_function/system/
pg_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_catalog::consts::{
18    DEFAULT_PRIVATE_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
19};
20use datafusion::arrow::array::{ArrayRef, StringArray, StringBuilder, as_boolean_array};
21use datafusion::catalog::TableFunction;
22use datafusion::common::ScalarValue;
23use datafusion::common::utils::SingleRowListArrayBuilder;
24use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility};
25use datafusion_pg_catalog::pg_catalog::{self, PgCatalogStaticTables};
26use datatypes::arrow::datatypes::{DataType, Field};
27use derive_more::derive::Display;
28
29use crate::function::{Function, find_function_context};
30use crate::function_registry::FunctionRegistry;
31use crate::system::define_nullary_udf;
32
33const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
34const CURRENT_SCHEMAS_FUNCTION_NAME: &str = "current_schemas";
35const SESSION_USER_FUNCTION_NAME: &str = "session_user";
36const CURRENT_DATABASE_FUNCTION_NAME: &str = "current_database";
37const OBJ_DESCRIPTION_FUNCTION_NAME: &str = "obj_description";
38const COL_DESCRIPTION_FUNCTION_NAME: &str = "col_description";
39const SHOBJ_DESCRIPTION_FUNCTION_NAME: &str = "shobj_description";
40const PG_MY_TEMP_SCHEMA_FUNCTION_NAME: &str = "pg_my_temp_schema";
41
42define_nullary_udf!(CurrentSchemaFunction);
43define_nullary_udf!(SessionUserFunction);
44define_nullary_udf!(CurrentDatabaseFunction);
45define_nullary_udf!(PgMyTempSchemaFunction);
46
47impl Function for CurrentDatabaseFunction {
48    fn name(&self) -> &str {
49        CURRENT_DATABASE_FUNCTION_NAME
50    }
51
52    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
53        Ok(DataType::Utf8View)
54    }
55
56    fn signature(&self) -> &Signature {
57        &self.signature
58    }
59
60    fn invoke_with_args(
61        &self,
62        args: ScalarFunctionArgs,
63    ) -> datafusion_common::Result<ColumnarValue> {
64        let func_ctx = find_function_context(&args)?;
65        let db = func_ctx.query_ctx.current_catalog().to_string();
66
67        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
68    }
69}
70
71// Though "current_schema" can be aliased to "database", to not cause any breaking changes,
72// we are not doing it: not until https://github.com/apache/datafusion/issues/17469 is resolved.
73impl Function for CurrentSchemaFunction {
74    fn name(&self) -> &str {
75        CURRENT_SCHEMA_FUNCTION_NAME
76    }
77
78    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
79        Ok(DataType::Utf8View)
80    }
81
82    fn signature(&self) -> &Signature {
83        &self.signature
84    }
85
86    fn invoke_with_args(
87        &self,
88        args: ScalarFunctionArgs,
89    ) -> datafusion_common::Result<ColumnarValue> {
90        let func_ctx = find_function_context(&args)?;
91        let db = func_ctx.query_ctx.current_schema();
92
93        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
94    }
95}
96
97impl Function for SessionUserFunction {
98    fn name(&self) -> &str {
99        SESSION_USER_FUNCTION_NAME
100    }
101
102    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
103        Ok(DataType::Utf8View)
104    }
105
106    fn signature(&self) -> &Signature {
107        &self.signature
108    }
109
110    fn invoke_with_args(
111        &self,
112        args: ScalarFunctionArgs,
113    ) -> datafusion_common::Result<ColumnarValue> {
114        let func_ctx = find_function_context(&args)?;
115        let user = func_ctx.query_ctx.current_user();
116
117        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
118            user.username().to_string(),
119        ))))
120    }
121}
122
123#[derive(Display, Debug)]
124#[display("{}", self.name())]
125pub(super) struct CurrentSchemasFunction {
126    signature: Signature,
127}
128
129impl CurrentSchemasFunction {
130    pub fn new() -> Self {
131        Self {
132            signature: Signature::new(
133                TypeSignature::Exact(vec![DataType::Boolean]),
134                Volatility::Stable,
135            ),
136        }
137    }
138}
139
140impl Function for CurrentSchemasFunction {
141    fn name(&self) -> &str {
142        CURRENT_SCHEMAS_FUNCTION_NAME
143    }
144
145    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
146        Ok(DataType::List(Arc::new(Field::new(
147            "item",
148            DataType::Utf8,
149            true,
150        ))))
151    }
152
153    fn signature(&self) -> &Signature {
154        &self.signature
155    }
156
157    fn invoke_with_args(
158        &self,
159        args: ScalarFunctionArgs,
160    ) -> datafusion_common::Result<ColumnarValue> {
161        let args = ColumnarValue::values_to_arrays(&args.args)?;
162        let input = as_boolean_array(&args[0]);
163
164        // Create a UTF8 array with a single value
165        let mut values = vec!["public"];
166        // include implicit schemas
167        if input.value(0) {
168            values.push(INFORMATION_SCHEMA_NAME);
169            values.push(PG_CATALOG_NAME);
170            values.push(DEFAULT_PRIVATE_SCHEMA_NAME);
171        }
172
173        let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
174
175        let array: ArrayRef = Arc::new(list_array.build_list_array());
176
177        Ok(ColumnarValue::Array(array))
178    }
179}
180
181/// PostgreSQL obj_description - returns NULL for compatibility
182#[derive(Display, Debug, Clone)]
183#[display("{}", self.name())]
184pub(super) struct ObjDescriptionFunction {
185    signature: Signature,
186}
187
188impl ObjDescriptionFunction {
189    pub fn new() -> Self {
190        Self {
191            signature: Signature::one_of(
192                vec![
193                    TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
194                    TypeSignature::Exact(vec![DataType::UInt32, DataType::Utf8]),
195                    TypeSignature::Exact(vec![DataType::Int64]),
196                    TypeSignature::Exact(vec![DataType::UInt32]),
197                ],
198                Volatility::Stable,
199            ),
200        }
201    }
202}
203
204impl Function for ObjDescriptionFunction {
205    fn name(&self) -> &str {
206        OBJ_DESCRIPTION_FUNCTION_NAME
207    }
208
209    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
210        Ok(DataType::Utf8)
211    }
212
213    fn signature(&self) -> &Signature {
214        &self.signature
215    }
216
217    fn invoke_with_args(
218        &self,
219        args: ScalarFunctionArgs,
220    ) -> datafusion_common::Result<ColumnarValue> {
221        let num_rows = args.number_rows;
222        let mut builder = StringBuilder::with_capacity(num_rows, 0);
223        for _ in 0..num_rows {
224            builder.append_null();
225        }
226        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
227    }
228}
229
230/// PostgreSQL col_description - returns NULL for compatibility
231#[derive(Display, Debug, Clone)]
232#[display("{}", self.name())]
233pub(super) struct ColDescriptionFunction {
234    signature: Signature,
235}
236
237impl ColDescriptionFunction {
238    pub fn new() -> Self {
239        Self {
240            signature: Signature::one_of(
241                vec![
242                    TypeSignature::Exact(vec![DataType::Int64, DataType::Int32]),
243                    TypeSignature::Exact(vec![DataType::UInt32, DataType::Int32]),
244                    TypeSignature::Exact(vec![DataType::Int64, DataType::Int64]),
245                    TypeSignature::Exact(vec![DataType::UInt32, DataType::Int64]),
246                ],
247                Volatility::Stable,
248            ),
249        }
250    }
251}
252
253impl Function for ColDescriptionFunction {
254    fn name(&self) -> &str {
255        COL_DESCRIPTION_FUNCTION_NAME
256    }
257
258    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
259        Ok(DataType::Utf8)
260    }
261
262    fn signature(&self) -> &Signature {
263        &self.signature
264    }
265
266    fn invoke_with_args(
267        &self,
268        args: ScalarFunctionArgs,
269    ) -> datafusion_common::Result<ColumnarValue> {
270        let num_rows = args.number_rows;
271        let mut builder = StringBuilder::with_capacity(num_rows, 0);
272        for _ in 0..num_rows {
273            builder.append_null();
274        }
275        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
276    }
277}
278
279/// PostgreSQL shobj_description - returns NULL for compatibility
280#[derive(Display, Debug, Clone)]
281#[display("{}", self.name())]
282pub(super) struct ShobjDescriptionFunction {
283    signature: Signature,
284}
285
286impl ShobjDescriptionFunction {
287    pub fn new() -> Self {
288        Self {
289            signature: Signature::one_of(
290                vec![
291                    TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
292                    TypeSignature::Exact(vec![DataType::UInt64, DataType::Utf8]),
293                    TypeSignature::Exact(vec![DataType::Int32, DataType::Utf8]),
294                    TypeSignature::Exact(vec![DataType::UInt32, DataType::Utf8]),
295                ],
296                Volatility::Stable,
297            ),
298        }
299    }
300}
301
302impl Function for ShobjDescriptionFunction {
303    fn name(&self) -> &str {
304        SHOBJ_DESCRIPTION_FUNCTION_NAME
305    }
306
307    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
308        Ok(DataType::Utf8)
309    }
310
311    fn signature(&self) -> &Signature {
312        &self.signature
313    }
314
315    fn invoke_with_args(
316        &self,
317        args: ScalarFunctionArgs,
318    ) -> datafusion_common::Result<ColumnarValue> {
319        let num_rows = args.number_rows;
320        let mut builder = StringBuilder::with_capacity(num_rows, 0);
321        for _ in 0..num_rows {
322            builder.append_null();
323        }
324        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
325    }
326}
327
328/// PostgreSQL pg_my_temp_schema - returns 0 (no temp schema) for compatibility
329impl Function for PgMyTempSchemaFunction {
330    fn name(&self) -> &str {
331        PG_MY_TEMP_SCHEMA_FUNCTION_NAME
332    }
333
334    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
335        Ok(DataType::UInt32)
336    }
337
338    fn signature(&self) -> &Signature {
339        &self.signature
340    }
341
342    fn invoke_with_args(
343        &self,
344        _args: ScalarFunctionArgs,
345    ) -> datafusion_common::Result<ColumnarValue> {
346        Ok(ColumnarValue::Scalar(ScalarValue::UInt32(Some(0))))
347    }
348}
349
350pub(super) struct PGCatalogFunction;
351
352impl PGCatalogFunction {
353    pub fn register(registry: &FunctionRegistry) {
354        let static_tables =
355            Arc::new(PgCatalogStaticTables::try_new().expect("load postgres static tables"));
356
357        registry.register_scalar(CurrentSchemaFunction::default());
358        registry.register_scalar(CurrentSchemasFunction::new());
359        registry.register_scalar(SessionUserFunction::default());
360        registry.register_scalar(CurrentDatabaseFunction::default());
361        registry.register(pg_catalog::format_type::create_format_type_udf());
362        registry.register(pg_catalog::create_pg_get_partkeydef_udf());
363        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
364            "has_table_privilege",
365        ));
366        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
367            "has_schema_privilege",
368        ));
369        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
370            "has_database_privilege",
371        ));
372        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
373            "has_any_column_privilege",
374        ));
375        registry.register_table_function(TableFunction::new(
376            "pg_get_keywords".to_string(),
377            static_tables.pg_get_keywords.clone(),
378        ));
379        registry.register(pg_catalog::create_pg_relation_is_publishable_udf());
380        registry.register(pg_catalog::create_pg_get_statisticsobjdef_columns_udf());
381        registry.register(pg_catalog::create_pg_get_userbyid_udf());
382        registry.register(pg_catalog::create_pg_table_is_visible());
383        registry.register(pg_catalog::pg_get_expr_udf::create_pg_get_expr_udf());
384        registry.register(pg_catalog::create_pg_encoding_to_char_udf());
385        registry.register(pg_catalog::create_pg_relation_size_udf());
386        registry.register(pg_catalog::create_pg_total_relation_size_udf());
387        registry.register(pg_catalog::create_pg_stat_get_numscans());
388        registry.register(pg_catalog::create_pg_get_constraintdef());
389        registry.register(pg_catalog::create_pg_get_partition_ancestors_udf());
390        registry.register(pg_catalog::quote_ident_udf::create_quote_ident_udf());
391        registry.register(pg_catalog::quote_ident_udf::create_parse_ident_udf());
392        registry.register_scalar(ObjDescriptionFunction::new());
393        registry.register_scalar(ColDescriptionFunction::new());
394        registry.register_scalar(ShobjDescriptionFunction::new());
395        registry.register_scalar(PgMyTempSchemaFunction::default());
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use std::sync::Arc;
402
403    use arrow_schema::Field;
404    use datafusion::arrow::array::Array;
405    use datafusion_common::ScalarValue;
406    use datafusion_expr::ColumnarValue;
407
408    use super::*;
409
410    fn create_test_args(args: Vec<ColumnarValue>, number_rows: usize) -> ScalarFunctionArgs {
411        ScalarFunctionArgs {
412            args,
413            arg_fields: vec![],
414            number_rows,
415            return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
416            config_options: Arc::new(Default::default()),
417        }
418    }
419
420    #[test]
421    fn test_obj_description_function() {
422        let func = ObjDescriptionFunction::new();
423        assert_eq!("obj_description", func.name());
424        assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
425
426        let args = create_test_args(
427            vec![
428                ColumnarValue::Scalar(ScalarValue::Int64(Some(1234))),
429                ColumnarValue::Scalar(ScalarValue::Utf8(Some("pg_class".to_string()))),
430            ],
431            1,
432        );
433        let result = func.invoke_with_args(args).unwrap();
434        if let ColumnarValue::Array(arr) = result {
435            assert_eq!(1, arr.len());
436            assert!(arr.is_null(0));
437        } else {
438            panic!("Expected Array result");
439        }
440    }
441
442    #[test]
443    fn test_col_description_function() {
444        let func = ColDescriptionFunction::new();
445        assert_eq!("col_description", func.name());
446        assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
447
448        let args = create_test_args(
449            vec![
450                ColumnarValue::Scalar(ScalarValue::Int64(Some(1234))),
451                ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
452            ],
453            1,
454        );
455        let result = func.invoke_with_args(args).unwrap();
456        if let ColumnarValue::Array(arr) = result {
457            assert_eq!(1, arr.len());
458            assert!(arr.is_null(0));
459        } else {
460            panic!("Expected Array result");
461        }
462    }
463
464    #[test]
465    fn test_shobj_description_function() {
466        let func = ShobjDescriptionFunction::new();
467        assert_eq!("shobj_description", func.name());
468        assert_eq!(DataType::Utf8, func.return_type(&[]).unwrap());
469
470        let args = create_test_args(
471            vec![
472                ColumnarValue::Scalar(ScalarValue::Int64(Some(1))),
473                ColumnarValue::Scalar(ScalarValue::Utf8(Some("pg_database".to_string()))),
474            ],
475            1,
476        );
477        let result = func.invoke_with_args(args).unwrap();
478        if let ColumnarValue::Array(arr) = result {
479            assert_eq!(1, arr.len());
480            assert!(arr.is_null(0));
481        } else {
482            panic!("Expected Array result");
483        }
484    }
485}