common_function/system/
pg_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod version;
16
17use std::sync::Arc;
18
19use common_catalog::consts::{
20    DEFAULT_PRIVATE_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
21};
22use datafusion::arrow::array::{ArrayRef, StringArray, as_boolean_array};
23use datafusion::catalog::TableFunction;
24use datafusion::common::ScalarValue;
25use datafusion::common::utils::SingleRowListArrayBuilder;
26use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
27use datafusion_pg_catalog::pg_catalog::{self, PgCatalogStaticTables};
28use datatypes::arrow::datatypes::{DataType, Field};
29use version::PGVersionFunction;
30
31use crate::function::{Function, find_function_context};
32use crate::function_registry::FunctionRegistry;
33use crate::system::define_nullary_udf;
34
35const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
36const CURRENT_SCHEMAS_FUNCTION_NAME: &str = "current_schemas";
37const SESSION_USER_FUNCTION_NAME: &str = "session_user";
38const CURRENT_DATABASE_FUNCTION_NAME: &str = "current_database";
39
40define_nullary_udf!(CurrentSchemaFunction);
41define_nullary_udf!(CurrentSchemasFunction);
42define_nullary_udf!(SessionUserFunction);
43define_nullary_udf!(CurrentDatabaseFunction);
44
45impl Function for CurrentDatabaseFunction {
46    fn name(&self) -> &str {
47        CURRENT_DATABASE_FUNCTION_NAME
48    }
49
50    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
51        Ok(DataType::Utf8View)
52    }
53
54    fn signature(&self) -> &Signature {
55        &self.signature
56    }
57
58    fn invoke_with_args(
59        &self,
60        args: ScalarFunctionArgs,
61    ) -> datafusion_common::Result<ColumnarValue> {
62        let func_ctx = find_function_context(&args)?;
63        let db = func_ctx.query_ctx.current_catalog().to_string();
64
65        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
66    }
67}
68
69// Though "current_schema" can be aliased to "database", to not cause any breaking changes,
70// we are not doing it: not until https://github.com/apache/datafusion/issues/17469 is resolved.
71impl Function for CurrentSchemaFunction {
72    fn name(&self) -> &str {
73        CURRENT_SCHEMA_FUNCTION_NAME
74    }
75
76    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
77        Ok(DataType::Utf8View)
78    }
79
80    fn signature(&self) -> &Signature {
81        &self.signature
82    }
83
84    fn invoke_with_args(
85        &self,
86        args: ScalarFunctionArgs,
87    ) -> datafusion_common::Result<ColumnarValue> {
88        let func_ctx = find_function_context(&args)?;
89        let db = func_ctx.query_ctx.current_schema();
90
91        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
92    }
93}
94
95impl Function for SessionUserFunction {
96    fn name(&self) -> &str {
97        SESSION_USER_FUNCTION_NAME
98    }
99
100    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
101        Ok(DataType::Utf8View)
102    }
103
104    fn signature(&self) -> &Signature {
105        &self.signature
106    }
107
108    fn invoke_with_args(
109        &self,
110        args: ScalarFunctionArgs,
111    ) -> datafusion_common::Result<ColumnarValue> {
112        let func_ctx = find_function_context(&args)?;
113        let user = func_ctx.query_ctx.current_user();
114
115        Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
116            user.username().to_string(),
117        ))))
118    }
119}
120
121impl Function for CurrentSchemasFunction {
122    fn name(&self) -> &str {
123        CURRENT_SCHEMAS_FUNCTION_NAME
124    }
125
126    fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
127        Ok(DataType::List(Arc::new(Field::new(
128            "x",
129            DataType::Utf8View,
130            false,
131        ))))
132    }
133
134    fn signature(&self) -> &Signature {
135        &self.signature
136    }
137
138    fn invoke_with_args(
139        &self,
140        args: ScalarFunctionArgs,
141    ) -> datafusion_common::Result<ColumnarValue> {
142        let args = ColumnarValue::values_to_arrays(&args.args)?;
143        let input = as_boolean_array(&args[0]);
144
145        // Create a UTF8 array with a single value
146        let mut values = vec!["public"];
147        // include implicit schemas
148        if input.value(0) {
149            values.push(INFORMATION_SCHEMA_NAME);
150            values.push(PG_CATALOG_NAME);
151            values.push(DEFAULT_PRIVATE_SCHEMA_NAME);
152        }
153
154        let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
155
156        let array: ArrayRef = Arc::new(list_array.build_list_array());
157
158        Ok(ColumnarValue::Array(array))
159    }
160}
161
162pub(super) struct PGCatalogFunction;
163
164impl PGCatalogFunction {
165    pub fn register(registry: &FunctionRegistry) {
166        let static_tables =
167            Arc::new(PgCatalogStaticTables::try_new().expect("load postgres static tables"));
168
169        registry.register_scalar(PGVersionFunction::default());
170        registry.register_scalar(CurrentSchemaFunction::default());
171        registry.register_scalar(CurrentSchemasFunction::default());
172        registry.register_scalar(SessionUserFunction::default());
173        registry.register_scalar(CurrentDatabaseFunction::default());
174        registry.register(pg_catalog::format_type::create_format_type_udf());
175        registry.register(pg_catalog::create_pg_get_partkeydef_udf());
176        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
177            "has_table_privilege",
178        ));
179        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
180            "has_schema_privilege",
181        ));
182        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
183            "has_database_privilege",
184        ));
185        registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
186            "has_any_column_privilege",
187        ));
188        registry.register_table_function(TableFunction::new(
189            "pg_get_keywords".to_string(),
190            static_tables.pg_get_keywords.clone(),
191        ));
192        registry.register(pg_catalog::create_pg_relation_is_publishable_udf());
193        registry.register(pg_catalog::create_pg_get_statisticsobjdef_columns_udf());
194        registry.register(pg_catalog::create_pg_get_userbyid_udf());
195        registry.register(pg_catalog::create_pg_table_is_visible());
196        registry.register(pg_catalog::pg_get_expr_udf::create_pg_get_expr_udf());
197        registry.register(pg_catalog::create_pg_encoding_to_char_udf());
198        registry.register(pg_catalog::create_pg_relation_size_udf());
199        registry.register(pg_catalog::create_pg_total_relation_size_udf());
200        registry.register(pg_catalog::create_pg_stat_get_numscans());
201        registry.register(pg_catalog::create_pg_get_constraintdef());
202    }
203}