catalog/system_schema/
pg_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef;
19use async_trait::async_trait;
20use common_catalog::consts::{DEFAULT_CATALOG_NAME, PG_CATALOG_NAME, PG_CATALOG_TABLE_ID_START};
21use common_error::ext::BoxedError;
22use common_recordbatch::SendableRecordBatchStream;
23use common_recordbatch::adapter::RecordBatchStreamAdapter;
24use common_telemetry::warn;
25use datafusion::datasource::TableType;
26use datafusion::error::DataFusionError;
27use datafusion::execution::TaskContext;
28use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
29use datafusion_pg_catalog::pg_catalog::catalog_info::CatalogInfo;
30use datafusion_pg_catalog::pg_catalog::{
31    PG_CATALOG_TABLES, PgCatalogSchemaProvider, PgCatalogStaticTables, PgCatalogTable,
32};
33use snafu::ResultExt;
34use store_api::storage::ScanRequest;
35use table::TableRef;
36use table::metadata::TableId;
37
38use crate::CatalogManager;
39use crate::error::{InternalSnafu, ProjectSchemaSnafu, Result};
40use crate::system_schema::{
41    SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
42};
43
44/// [`PGCatalogProvider`] is the provider for a schema named `pg_catalog`, it is not a catalog.
45pub struct PGCatalogProvider {
46    catalog_name: String,
47    inner: PgCatalogSchemaProvider<CatalogManagerWrapper>,
48    tables: HashMap<String, TableRef>,
49    table_ids: HashMap<&'static str, u32>,
50}
51
52impl SystemSchemaProvider for PGCatalogProvider {
53    fn tables(&self) -> &HashMap<String, TableRef> {
54        assert!(!self.tables.is_empty());
55
56        &self.tables
57    }
58}
59
60impl PGCatalogProvider {
61    pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
62        // safe to expect/unwrap because it contains only schema read, this can
63        // be ensured by sqlness tests
64        let static_tables =
65            PgCatalogStaticTables::try_new().expect("Failed to initialize static tables");
66        let inner = PgCatalogSchemaProvider::try_new(
67            CatalogManagerWrapper {
68                catalog_name: catalog_name.clone(),
69                catalog_manager,
70            },
71            Arc::new(static_tables),
72        )
73        .expect("Failed to initialize PgCatalogSchemaProvider");
74
75        let mut table_ids = HashMap::new();
76        let mut table_id = PG_CATALOG_TABLE_ID_START;
77        for name in PG_CATALOG_TABLES {
78            table_ids.insert(*name, table_id);
79            table_id += 1;
80        }
81
82        let mut provider = Self {
83            catalog_name,
84            inner,
85            tables: HashMap::new(),
86            table_ids,
87        };
88        provider.build_tables();
89        provider
90    }
91
92    fn build_tables(&mut self) {
93        // SECURITY NOTE:
94        // Must follow the same security rules as [`InformationSchemaProvider::build_tables`].
95        let mut tables = HashMap::new();
96        // It's safe to unwrap here because we are sure that the constants have been handle correctly inside system_table.
97        for name in PG_CATALOG_TABLES {
98            if let Some(table) = self.build_table(name) {
99                tables.insert(name.to_string(), table);
100            }
101        }
102
103        self.tables = tables;
104    }
105}
106
107impl SystemSchemaProviderInner for PGCatalogProvider {
108    fn schema_name() -> &'static str {
109        PG_CATALOG_NAME
110    }
111
112    fn system_table(&self, name: &str) -> Option<SystemTableRef> {
113        if let Some((table_name, table_id)) = self.table_ids.get_key_value(name) {
114            let table = self.inner.build_table_by_name(name).expect(name);
115
116            if let Some(table) = table {
117                if let Ok(system_table) = DFTableProviderAsSystemTable::try_new(
118                    *table_id,
119                    table_name,
120                    table::metadata::TableType::Temporary,
121                    table,
122                ) {
123                    Some(Arc::new(system_table))
124                } else {
125                    warn!("failed to create pg_catalog system table {}", name);
126                    None
127                }
128            } else {
129                None
130            }
131        } else {
132            None
133        }
134    }
135
136    fn catalog_name(&self) -> &str {
137        &self.catalog_name
138    }
139}
140
141#[derive(Clone)]
142pub struct CatalogManagerWrapper {
143    catalog_name: String,
144    catalog_manager: Weak<dyn CatalogManager>,
145}
146
147impl CatalogManagerWrapper {
148    fn catalog_manager(&self) -> std::result::Result<Arc<dyn CatalogManager>, DataFusionError> {
149        self.catalog_manager.upgrade().ok_or_else(|| {
150            DataFusionError::Internal("Failed to access catalog manager".to_string())
151        })
152    }
153}
154
155impl std::fmt::Debug for CatalogManagerWrapper {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        f.debug_struct("CatalogManagerWrapper").finish()
158    }
159}
160
161#[async_trait]
162impl CatalogInfo for CatalogManagerWrapper {
163    async fn catalog_names(&self) -> std::result::Result<Vec<String>, DataFusionError> {
164        if self.catalog_name == DEFAULT_CATALOG_NAME {
165            CatalogManager::catalog_names(self.catalog_manager()?.as_ref())
166                .await
167                .map_err(|e| DataFusionError::External(Box::new(e)))
168        } else {
169            Ok(vec![self.catalog_name.clone()])
170        }
171    }
172
173    async fn schema_names(
174        &self,
175        catalog_name: &str,
176    ) -> std::result::Result<Option<Vec<String>>, DataFusionError> {
177        self.catalog_manager()?
178            .schema_names(catalog_name, None)
179            .await
180            .map(Some)
181            .map_err(|e| DataFusionError::External(Box::new(e)))
182    }
183
184    async fn table_names(
185        &self,
186        catalog_name: &str,
187        schema_name: &str,
188    ) -> std::result::Result<Option<Vec<String>>, DataFusionError> {
189        self.catalog_manager()?
190            .table_names(catalog_name, schema_name, None)
191            .await
192            .map(Some)
193            .map_err(|e| DataFusionError::External(Box::new(e)))
194    }
195
196    async fn table_schema(
197        &self,
198        catalog_name: &str,
199        schema_name: &str,
200        table_name: &str,
201    ) -> std::result::Result<Option<SchemaRef>, DataFusionError> {
202        let table = self
203            .catalog_manager()?
204            .table(catalog_name, schema_name, table_name, None)
205            .await
206            .map_err(|e| DataFusionError::External(Box::new(e)))?;
207
208        Ok(table.map(|t| t.schema().arrow_schema().clone()))
209    }
210
211    async fn table_type(
212        &self,
213        catalog_name: &str,
214        schema_name: &str,
215        table_name: &str,
216    ) -> std::result::Result<Option<TableType>, DataFusionError> {
217        let table = self
218            .catalog_manager()?
219            .table(catalog_name, schema_name, table_name, None)
220            .await
221            .map_err(|e| DataFusionError::External(Box::new(e)))?;
222
223        Ok(table.map(|t| t.table_type().into()))
224    }
225}
226
227struct DFTableProviderAsSystemTable {
228    pub table_id: TableId,
229    pub table_name: &'static str,
230    pub table_type: table::metadata::TableType,
231    pub schema: Arc<datatypes::schema::Schema>,
232    pub table_provider: PgCatalogTable,
233}
234
235impl DFTableProviderAsSystemTable {
236    pub fn try_new(
237        table_id: TableId,
238        table_name: &'static str,
239        table_type: table::metadata::TableType,
240        table_provider: PgCatalogTable,
241    ) -> Result<Self> {
242        let arrow_schema = table_provider.schema();
243        let schema = Arc::new(arrow_schema.try_into().context(ProjectSchemaSnafu)?);
244        Ok(Self {
245            table_id,
246            table_name,
247            table_type,
248            schema,
249            table_provider,
250        })
251    }
252}
253
254impl SystemTable for DFTableProviderAsSystemTable {
255    fn table_id(&self) -> TableId {
256        self.table_id
257    }
258
259    fn table_name(&self) -> &'static str {
260        self.table_name
261    }
262
263    fn schema(&self) -> Arc<datatypes::schema::Schema> {
264        self.schema.clone()
265    }
266
267    fn table_type(&self) -> table::metadata::TableType {
268        self.table_type
269    }
270
271    fn to_stream(&self, _request: ScanRequest) -> Result<SendableRecordBatchStream> {
272        match &self.table_provider {
273            PgCatalogTable::Static(table) => {
274                let schema = self.schema.arrow_schema().clone();
275                let data = table
276                    .data()
277                    .iter()
278                    .map(|rb| Ok(rb.clone()))
279                    .collect::<Vec<_>>();
280                let stream = Box::pin(DfRecordBatchStreamAdapter::new(
281                    schema,
282                    futures::stream::iter(data),
283                ));
284                Ok(Box::pin(
285                    RecordBatchStreamAdapter::try_new(stream)
286                        .map_err(BoxedError::new)
287                        .context(InternalSnafu)?,
288                ))
289            }
290
291            PgCatalogTable::Dynamic(table) => {
292                let stream = table.execute(Arc::new(TaskContext::default()));
293                Ok(Box::pin(
294                    RecordBatchStreamAdapter::try_new(stream)
295                        .map_err(BoxedError::new)
296                        .context(InternalSnafu)?,
297                ))
298            }
299
300            PgCatalogTable::Empty(_) => {
301                let schema = self.schema.arrow_schema().clone();
302                let stream = Box::pin(DfRecordBatchStreamAdapter::new(
303                    schema,
304                    futures::stream::iter(vec![]),
305                ));
306                Ok(Box::pin(
307                    RecordBatchStreamAdapter::try_new(stream)
308                        .map_err(BoxedError::new)
309                        .context(InternalSnafu)?,
310                ))
311            }
312        }
313    }
314}