catalog/system_schema/
pg_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef;
19use async_trait::async_trait;
20use common_catalog::consts::{DEFAULT_CATALOG_NAME, PG_CATALOG_NAME, PG_CATALOG_TABLE_ID_START};
21use common_error::ext::BoxedError;
22use common_recordbatch::SendableRecordBatchStream;
23use common_recordbatch::adapter::RecordBatchStreamAdapter;
24use common_telemetry::warn;
25use datafusion::datasource::TableType;
26use datafusion::error::DataFusionError;
27use datafusion::execution::TaskContext;
28use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
29use datafusion_pg_catalog::pg_catalog::catalog_info::CatalogInfo;
30use datafusion_pg_catalog::pg_catalog::context::EmptyContextProvider;
31use datafusion_pg_catalog::pg_catalog::{
32    PG_CATALOG_TABLES, PgCatalogSchemaProvider, PgCatalogStaticTables, PgCatalogTable,
33};
34use snafu::ResultExt;
35use store_api::storage::ScanRequest;
36use table::TableRef;
37use table::metadata::TableId;
38
39use crate::CatalogManager;
40use crate::error::{InternalSnafu, ProjectSchemaSnafu, Result};
41use crate::system_schema::{
42    SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
43};
44
45/// [`PGCatalogProvider`] is the provider for a schema named `pg_catalog`, it is not a catalog.
46pub struct PGCatalogProvider {
47    catalog_name: String,
48    inner: PgCatalogSchemaProvider<CatalogManagerWrapper, EmptyContextProvider>,
49    tables: HashMap<String, TableRef>,
50    table_ids: HashMap<&'static str, u32>,
51}
52
53impl SystemSchemaProvider for PGCatalogProvider {
54    fn tables(&self) -> &HashMap<String, TableRef> {
55        assert!(!self.tables.is_empty());
56
57        &self.tables
58    }
59}
60
61impl PGCatalogProvider {
62    pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
63        // safe to expect/unwrap because it contains only schema read, this can
64        // be ensured by sqlness tests
65        let static_tables =
66            PgCatalogStaticTables::try_new().expect("Failed to initialize static tables");
67        let inner = PgCatalogSchemaProvider::try_new(
68            CatalogManagerWrapper {
69                catalog_name: catalog_name.clone(),
70                catalog_manager,
71            },
72            Arc::new(static_tables),
73            EmptyContextProvider,
74        )
75        .expect("Failed to initialize PgCatalogSchemaProvider");
76
77        let mut table_ids = HashMap::new();
78        let mut table_id = PG_CATALOG_TABLE_ID_START;
79        for name in PG_CATALOG_TABLES {
80            table_ids.insert(*name, table_id);
81            table_id += 1;
82        }
83
84        let mut provider = Self {
85            catalog_name,
86            inner,
87            tables: HashMap::new(),
88            table_ids,
89        };
90        provider.build_tables();
91        provider
92    }
93
94    fn build_tables(&mut self) {
95        // SECURITY NOTE:
96        // Must follow the same security rules as [`InformationSchemaProvider::build_tables`].
97        let mut tables = HashMap::new();
98        // It's safe to unwrap here because we are sure that the constants have been handle correctly inside system_table.
99        for name in PG_CATALOG_TABLES {
100            if let Some(table) = self.build_table(name) {
101                tables.insert(name.to_string(), table);
102            }
103        }
104
105        self.tables = tables;
106    }
107}
108
109impl SystemSchemaProviderInner for PGCatalogProvider {
110    fn schema_name() -> &'static str {
111        PG_CATALOG_NAME
112    }
113
114    fn system_table(&self, name: &str) -> Option<SystemTableRef> {
115        if let Some((table_name, table_id)) = self.table_ids.get_key_value(name) {
116            let table = self.inner.build_table_by_name(name).expect(name);
117
118            if let Some(table) = table {
119                if let Ok(system_table) = DFTableProviderAsSystemTable::try_new(
120                    *table_id,
121                    table_name,
122                    table::metadata::TableType::Temporary,
123                    table,
124                ) {
125                    Some(Arc::new(system_table))
126                } else {
127                    warn!("failed to create pg_catalog system table {}", name);
128                    None
129                }
130            } else {
131                None
132            }
133        } else {
134            None
135        }
136    }
137
138    fn catalog_name(&self) -> &str {
139        &self.catalog_name
140    }
141}
142
143#[derive(Clone)]
144pub struct CatalogManagerWrapper {
145    catalog_name: String,
146    catalog_manager: Weak<dyn CatalogManager>,
147}
148
149impl CatalogManagerWrapper {
150    fn catalog_manager(&self) -> std::result::Result<Arc<dyn CatalogManager>, DataFusionError> {
151        self.catalog_manager.upgrade().ok_or_else(|| {
152            DataFusionError::Internal("Failed to access catalog manager".to_string())
153        })
154    }
155}
156
157impl std::fmt::Debug for CatalogManagerWrapper {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct("CatalogManagerWrapper").finish()
160    }
161}
162
163#[async_trait]
164impl CatalogInfo for CatalogManagerWrapper {
165    async fn catalog_names(&self) -> std::result::Result<Vec<String>, DataFusionError> {
166        if self.catalog_name == DEFAULT_CATALOG_NAME {
167            CatalogManager::catalog_names(self.catalog_manager()?.as_ref())
168                .await
169                .map_err(|e| DataFusionError::External(Box::new(e)))
170        } else {
171            Ok(vec![self.catalog_name.clone()])
172        }
173    }
174
175    async fn schema_names(
176        &self,
177        catalog_name: &str,
178    ) -> std::result::Result<Option<Vec<String>>, DataFusionError> {
179        self.catalog_manager()?
180            .schema_names(catalog_name, None)
181            .await
182            .map(Some)
183            .map_err(|e| DataFusionError::External(Box::new(e)))
184    }
185
186    async fn table_names(
187        &self,
188        catalog_name: &str,
189        schema_name: &str,
190    ) -> std::result::Result<Option<Vec<String>>, DataFusionError> {
191        self.catalog_manager()?
192            .table_names(catalog_name, schema_name, None)
193            .await
194            .map(Some)
195            .map_err(|e| DataFusionError::External(Box::new(e)))
196    }
197
198    async fn table_schema(
199        &self,
200        catalog_name: &str,
201        schema_name: &str,
202        table_name: &str,
203    ) -> std::result::Result<Option<SchemaRef>, DataFusionError> {
204        let table = self
205            .catalog_manager()?
206            .table(catalog_name, schema_name, table_name, None)
207            .await
208            .map_err(|e| DataFusionError::External(Box::new(e)))?;
209
210        Ok(table.map(|t| t.schema().arrow_schema().clone()))
211    }
212
213    async fn table_type(
214        &self,
215        catalog_name: &str,
216        schema_name: &str,
217        table_name: &str,
218    ) -> std::result::Result<Option<TableType>, DataFusionError> {
219        let table = self
220            .catalog_manager()?
221            .table(catalog_name, schema_name, table_name, None)
222            .await
223            .map_err(|e| DataFusionError::External(Box::new(e)))?;
224
225        Ok(table.map(|t| t.table_type().into()))
226    }
227}
228
229struct DFTableProviderAsSystemTable {
230    pub table_id: TableId,
231    pub table_name: &'static str,
232    pub table_type: table::metadata::TableType,
233    pub schema: Arc<datatypes::schema::Schema>,
234    pub table_provider: PgCatalogTable,
235}
236
237impl DFTableProviderAsSystemTable {
238    pub fn try_new(
239        table_id: TableId,
240        table_name: &'static str,
241        table_type: table::metadata::TableType,
242        table_provider: PgCatalogTable,
243    ) -> Result<Self> {
244        let arrow_schema = table_provider.schema();
245        let schema = Arc::new(arrow_schema.try_into().context(ProjectSchemaSnafu)?);
246        Ok(Self {
247            table_id,
248            table_name,
249            table_type,
250            schema,
251            table_provider,
252        })
253    }
254}
255
256impl SystemTable for DFTableProviderAsSystemTable {
257    fn table_id(&self) -> TableId {
258        self.table_id
259    }
260
261    fn table_name(&self) -> &'static str {
262        self.table_name
263    }
264
265    fn schema(&self) -> Arc<datatypes::schema::Schema> {
266        self.schema.clone()
267    }
268
269    fn table_type(&self) -> table::metadata::TableType {
270        self.table_type
271    }
272
273    fn to_stream(&self, _request: ScanRequest) -> Result<SendableRecordBatchStream> {
274        match &self.table_provider {
275            PgCatalogTable::Static(table) => {
276                let schema = self.schema.arrow_schema().clone();
277                let data = table
278                    .data()
279                    .iter()
280                    .map(|rb| Ok(rb.clone()))
281                    .collect::<Vec<_>>();
282                let stream = Box::pin(DfRecordBatchStreamAdapter::new(
283                    schema,
284                    futures::stream::iter(data),
285                ));
286                Ok(Box::pin(
287                    RecordBatchStreamAdapter::try_new(stream)
288                        .map_err(BoxedError::new)
289                        .context(InternalSnafu)?,
290                ))
291            }
292
293            PgCatalogTable::Dynamic(table) => {
294                let stream = table.execute(Arc::new(TaskContext::default()));
295                Ok(Box::pin(
296                    RecordBatchStreamAdapter::try_new(stream)
297                        .map_err(BoxedError::new)
298                        .context(InternalSnafu)?,
299                ))
300            }
301
302            PgCatalogTable::Empty(_) => {
303                let schema = self.schema.arrow_schema().clone();
304                let stream = Box::pin(DfRecordBatchStreamAdapter::new(
305                    schema,
306                    futures::stream::iter(vec![]),
307                ));
308                Ok(Box::pin(
309                    RecordBatchStreamAdapter::try_new(stream)
310                        .map_err(BoxedError::new)
311                        .context(InternalSnafu)?,
312                ))
313            }
314        }
315    }
316}