common_function/system/
pg_catalog.rs1mod version;
16
17use std::sync::Arc;
18
19use common_catalog::consts::{
20 DEFAULT_PRIVATE_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
21};
22use datafusion::arrow::array::{ArrayRef, StringArray, as_boolean_array};
23use datafusion::catalog::TableFunction;
24use datafusion::common::ScalarValue;
25use datafusion::common::utils::SingleRowListArrayBuilder;
26use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
27use datafusion_pg_catalog::pg_catalog::{self, PgCatalogStaticTables};
28use datatypes::arrow::datatypes::{DataType, Field};
29use version::PGVersionFunction;
30
31use crate::function::{Function, find_function_context};
32use crate::function_registry::FunctionRegistry;
33use crate::system::define_nullary_udf;
34
35const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
36const CURRENT_SCHEMAS_FUNCTION_NAME: &str = "current_schemas";
37const SESSION_USER_FUNCTION_NAME: &str = "session_user";
38const CURRENT_DATABASE_FUNCTION_NAME: &str = "current_database";
39
40define_nullary_udf!(CurrentSchemaFunction);
41define_nullary_udf!(CurrentSchemasFunction);
42define_nullary_udf!(SessionUserFunction);
43define_nullary_udf!(CurrentDatabaseFunction);
44
45impl Function for CurrentDatabaseFunction {
46 fn name(&self) -> &str {
47 CURRENT_DATABASE_FUNCTION_NAME
48 }
49
50 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
51 Ok(DataType::Utf8View)
52 }
53
54 fn signature(&self) -> &Signature {
55 &self.signature
56 }
57
58 fn invoke_with_args(
59 &self,
60 args: ScalarFunctionArgs,
61 ) -> datafusion_common::Result<ColumnarValue> {
62 let func_ctx = find_function_context(&args)?;
63 let db = func_ctx.query_ctx.current_catalog().to_string();
64
65 Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
66 }
67}
68
69impl Function for CurrentSchemaFunction {
72 fn name(&self) -> &str {
73 CURRENT_SCHEMA_FUNCTION_NAME
74 }
75
76 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
77 Ok(DataType::Utf8View)
78 }
79
80 fn signature(&self) -> &Signature {
81 &self.signature
82 }
83
84 fn invoke_with_args(
85 &self,
86 args: ScalarFunctionArgs,
87 ) -> datafusion_common::Result<ColumnarValue> {
88 let func_ctx = find_function_context(&args)?;
89 let db = func_ctx.query_ctx.current_schema();
90
91 Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
92 }
93}
94
95impl Function for SessionUserFunction {
96 fn name(&self) -> &str {
97 SESSION_USER_FUNCTION_NAME
98 }
99
100 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
101 Ok(DataType::Utf8View)
102 }
103
104 fn signature(&self) -> &Signature {
105 &self.signature
106 }
107
108 fn invoke_with_args(
109 &self,
110 args: ScalarFunctionArgs,
111 ) -> datafusion_common::Result<ColumnarValue> {
112 let func_ctx = find_function_context(&args)?;
113 let user = func_ctx.query_ctx.current_user();
114
115 Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
116 user.username().to_string(),
117 ))))
118 }
119}
120
121impl Function for CurrentSchemasFunction {
122 fn name(&self) -> &str {
123 CURRENT_SCHEMAS_FUNCTION_NAME
124 }
125
126 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
127 Ok(DataType::List(Arc::new(Field::new(
128 "x",
129 DataType::Utf8View,
130 false,
131 ))))
132 }
133
134 fn signature(&self) -> &Signature {
135 &self.signature
136 }
137
138 fn invoke_with_args(
139 &self,
140 args: ScalarFunctionArgs,
141 ) -> datafusion_common::Result<ColumnarValue> {
142 let args = ColumnarValue::values_to_arrays(&args.args)?;
143 let input = as_boolean_array(&args[0]);
144
145 let mut values = vec!["public"];
147 if input.value(0) {
149 values.push(INFORMATION_SCHEMA_NAME);
150 values.push(PG_CATALOG_NAME);
151 values.push(DEFAULT_PRIVATE_SCHEMA_NAME);
152 }
153
154 let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
155
156 let array: ArrayRef = Arc::new(list_array.build_list_array());
157
158 Ok(ColumnarValue::Array(array))
159 }
160}
161
162pub(super) struct PGCatalogFunction;
163
164impl PGCatalogFunction {
165 pub fn register(registry: &FunctionRegistry) {
166 let static_tables =
167 Arc::new(PgCatalogStaticTables::try_new().expect("load postgres static tables"));
168
169 registry.register_scalar(PGVersionFunction::default());
170 registry.register_scalar(CurrentSchemaFunction::default());
171 registry.register_scalar(CurrentSchemasFunction::default());
172 registry.register_scalar(SessionUserFunction::default());
173 registry.register_scalar(CurrentDatabaseFunction::default());
174 registry.register(pg_catalog::format_type::create_format_type_udf());
175 registry.register(pg_catalog::create_pg_get_partkeydef_udf());
176 registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
177 "has_table_privilege",
178 ));
179 registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
180 "has_schema_privilege",
181 ));
182 registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
183 "has_database_privilege",
184 ));
185 registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
186 "has_any_column_privilege",
187 ));
188 registry.register_table_function(TableFunction::new(
189 "pg_get_keywords".to_string(),
190 static_tables.pg_get_keywords.clone(),
191 ));
192 registry.register(pg_catalog::create_pg_relation_is_publishable_udf());
193 registry.register(pg_catalog::create_pg_get_statisticsobjdef_columns_udf());
194 registry.register(pg_catalog::create_pg_get_userbyid_udf());
195 registry.register(pg_catalog::create_pg_table_is_visible());
196 registry.register(pg_catalog::pg_get_expr_udf::create_pg_get_expr_udf());
197 registry.register(pg_catalog::create_pg_encoding_to_char_udf());
198 registry.register(pg_catalog::create_pg_relation_size_udf());
199 registry.register(pg_catalog::create_pg_total_relation_size_udf());
200 registry.register(pg_catalog::create_pg_stat_get_numscans());
201 registry.register(pg_catalog::create_pg_get_constraintdef());
202 }
203}