common_function/system/
pg_catalog.rs1mod version;
16
17use std::sync::Arc;
18
19use datafusion::arrow::array::{ArrayRef, StringArray, as_boolean_array};
20use datafusion::catalog::TableFunction;
21use datafusion::common::ScalarValue;
22use datafusion::common::utils::SingleRowListArrayBuilder;
23use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
24use datafusion_pg_catalog::pg_catalog::{self, PgCatalogStaticTables};
25use datatypes::arrow::datatypes::{DataType, Field};
26use version::PGVersionFunction;
27
28use crate::function::{Function, find_function_context};
29use crate::function_registry::FunctionRegistry;
30use crate::system::define_nullary_udf;
31
32const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
33const CURRENT_SCHEMAS_FUNCTION_NAME: &str = "current_schemas";
34const SESSION_USER_FUNCTION_NAME: &str = "session_user";
35
36define_nullary_udf!(CurrentSchemaFunction);
37define_nullary_udf!(CurrentSchemasFunction);
38define_nullary_udf!(SessionUserFunction);
39
40impl Function for CurrentSchemaFunction {
43 fn name(&self) -> &str {
44 CURRENT_SCHEMA_FUNCTION_NAME
45 }
46
47 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
48 Ok(DataType::Utf8View)
49 }
50
51 fn signature(&self) -> &Signature {
52 &self.signature
53 }
54
55 fn invoke_with_args(
56 &self,
57 args: ScalarFunctionArgs,
58 ) -> datafusion_common::Result<ColumnarValue> {
59 let func_ctx = find_function_context(&args)?;
60 let db = func_ctx.query_ctx.current_schema();
61
62 Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
63 }
64}
65
66impl Function for SessionUserFunction {
67 fn name(&self) -> &str {
68 SESSION_USER_FUNCTION_NAME
69 }
70
71 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
72 Ok(DataType::Utf8View)
73 }
74
75 fn signature(&self) -> &Signature {
76 &self.signature
77 }
78
79 fn invoke_with_args(
80 &self,
81 args: ScalarFunctionArgs,
82 ) -> datafusion_common::Result<ColumnarValue> {
83 let func_ctx = find_function_context(&args)?;
84 let user = func_ctx.query_ctx.current_user();
85
86 Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
87 user.username().to_string(),
88 ))))
89 }
90}
91
92impl Function for CurrentSchemasFunction {
93 fn name(&self) -> &str {
94 CURRENT_SCHEMAS_FUNCTION_NAME
95 }
96
97 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
98 Ok(DataType::List(Arc::new(Field::new(
99 "x",
100 DataType::Utf8View,
101 false,
102 ))))
103 }
104
105 fn signature(&self) -> &Signature {
106 &self.signature
107 }
108
109 fn invoke_with_args(
110 &self,
111 args: ScalarFunctionArgs,
112 ) -> datafusion_common::Result<ColumnarValue> {
113 let args = ColumnarValue::values_to_arrays(&args.args)?;
114 let input = as_boolean_array(&args[0]);
115
116 let mut values = vec!["public"];
118 if input.value(0) {
120 values.push("information_schema");
121 values.push("pg_catalog");
122 values.push("greptime_private");
123 }
124
125 let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
126
127 let array: ArrayRef = Arc::new(list_array.build_list_array());
128
129 Ok(ColumnarValue::Array(array))
130 }
131}
132
133pub(super) struct PGCatalogFunction;
134
135impl PGCatalogFunction {
136 pub fn register(registry: &FunctionRegistry) {
137 let static_tables =
138 Arc::new(PgCatalogStaticTables::try_new().expect("load postgres static tables"));
139
140 registry.register_scalar(PGVersionFunction::default());
141 registry.register_scalar(CurrentSchemaFunction::default());
142 registry.register_scalar(CurrentSchemasFunction::default());
143 registry.register_scalar(SessionUserFunction::default());
144 registry.register(pg_catalog::format_type::create_format_type_udf());
145 registry.register(pg_catalog::create_pg_get_partkeydef_udf());
146 registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
147 "has_table_privilege",
148 ));
149 registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
150 "has_schema_privilege",
151 ));
152 registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
153 "has_database_privilege",
154 ));
155 registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
156 "has_any_column_privilege",
157 ));
158 registry.register_table_function(TableFunction::new(
159 "pg_get_keywords".to_string(),
160 static_tables.pg_get_keywords.clone(),
161 ));
162 registry.register(pg_catalog::create_pg_relation_is_publishable_udf());
163 registry.register(pg_catalog::create_pg_get_statisticsobjdef_columns_udf());
164 registry.register(pg_catalog::create_pg_get_userbyid_udf());
165 registry.register(pg_catalog::create_pg_table_is_visible());
166 registry.register(pg_catalog::pg_get_expr_udf::create_pg_get_expr_udf());
167 }
170}