common_function/system/
pg_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod version;
16
17use std::sync::Arc;
18
19use datafusion::arrow::array::{ArrayRef, StringArray, as_boolean_array};
20use datafusion::catalog::TableFunction;
21use datafusion::common::ScalarValue;
22use datafusion::common::utils::SingleRowListArrayBuilder;
23use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
24use datafusion_pg_catalog::pg_catalog::{self, PgCatalogStaticTables};
25use datatypes::arrow::datatypes::{DataType, Field};
26use version::PGVersionFunction;
27
28use crate::function::{Function, find_function_context};
29use crate::function_registry::FunctionRegistry;
30use crate::system::define_nullary_udf;
31
32const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
33const CURRENT_SCHEMAS_FUNCTION_NAME: &str = "current_schemas";
34const SESSION_USER_FUNCTION_NAME: &str = "session_user";
35
36define_nullary_udf!(CurrentSchemaFunction);
37define_nullary_udf!(CurrentSchemasFunction);
38define_nullary_udf!(SessionUserFunction);
39
40// Though "current_schema" can be aliased to "database", to not cause any breaking changes,
41// we are not doing it: not until https://github.com/apache/datafusion/issues/17469 is resolved.
42impl Function for CurrentSchemaFunction {
43    fn name(&self) -> &str {
44        CURRENT_SCHEMA_FUNCTION_NAME
45    }
46
47    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
48        Ok(DataType::Utf8View)
49    }
50
51    fn signature(&self) -> &Signature {
52        &self.signature
53    }
54
55    fn invoke_with_args(
56        &self,
57        args: ScalarFunctionArgs,
58    ) -> datafusion_common::Result<ColumnarValue> {
59        let func_ctx = find_function_context(&args)?;
60        let db = func_ctx.query_ctx.current_schema();
61
62        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
63    }
64}
65
66impl Function for SessionUserFunction {
67    fn name(&self) -> &str {
68        SESSION_USER_FUNCTION_NAME
69    }
70
71    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
72        Ok(DataType::Utf8View)
73    }
74
75    fn signature(&self) -> &Signature {
76        &self.signature
77    }
78
79    fn invoke_with_args(
80        &self,
81        args: ScalarFunctionArgs,
82    ) -> datafusion_common::Result<ColumnarValue> {
83        let func_ctx = find_function_context(&args)?;
84        let user = func_ctx.query_ctx.current_user();
85
86        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
87            user.username().to_string(),
88        ))))
89    }
90}
91
92impl Function for CurrentSchemasFunction {
93    fn name(&self) -> &str {
94        CURRENT_SCHEMAS_FUNCTION_NAME
95    }
96
97    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
98        Ok(DataType::List(Arc::new(Field::new(
99            "x",
100            DataType::Utf8View,
101            false,
102        ))))
103    }
104
105    fn signature(&self) -> &Signature {
106        &self.signature
107    }
108
109    fn invoke_with_args(
110        &self,
111        args: ScalarFunctionArgs,
112    ) -> datafusion_common::Result<ColumnarValue> {
113        let args = ColumnarValue::values_to_arrays(&args.args)?;
114        let input = as_boolean_array(&args[0]);
115
116        // Create a UTF8 array with a single value
117        let mut values = vec!["public"];
118        // include implicit schemas
119        if input.value(0) {
120            values.push("information_schema");
121            values.push("pg_catalog");
122            values.push("greptime_private");
123        }
124
125        let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
126
127        let array: ArrayRef = Arc::new(list_array.build_list_array());
128
129        Ok(ColumnarValue::Array(array))
130    }
131}
132
133pub(super) struct PGCatalogFunction;
134
135impl PGCatalogFunction {
136    pub fn register(registry: &FunctionRegistry) {
137        let static_tables =
138            Arc::new(PgCatalogStaticTables::try_new().expect("load postgres static tables"));
139
140        registry.register_scalar(PGVersionFunction::default());
141        registry.register_scalar(CurrentSchemaFunction::default());
142        registry.register_scalar(CurrentSchemasFunction::default());
143        registry.register_scalar(SessionUserFunction::default());
144        registry.register(pg_catalog::format_type::create_format_type_udf());
145        registry.register(pg_catalog::create_pg_get_partkeydef_udf());
146        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
147            "has_table_privilege",
148        ));
149        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
150            "has_schema_privilege",
151        ));
152        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
153            "has_database_privilege",
154        ));
155        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
156            "has_any_column_privilege",
157        ));
158        registry.register_table_function(TableFunction::new(
159            "pg_get_keywords".to_string(),
160            static_tables.pg_get_keywords.clone(),
161        ));
162        registry.register(pg_catalog::create_pg_relation_is_publishable_udf());
163        registry.register(pg_catalog::create_pg_get_statisticsobjdef_columns_udf());
164        registry.register(pg_catalog::create_pg_get_userbyid_udf());
165        registry.register(pg_catalog::create_pg_table_is_visible());
166        registry.register(pg_catalog::pg_get_expr_udf::create_pg_get_expr_udf());
167        // TODO(sunng87): upgrade datafusion to add
168        //registry.register(pg_catalog::create_pg_encoding_to_char_udf());
169    }
170}