catalog/system_schema/pg_catalog/
pg_namespace.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The `pg_catalog.pg_namespace` table implementation.
16//! namespace is a schema in greptime
17
18pub(super) mod oid_map;
19
20use std::fmt;
21use std::sync::{Arc, Weak};
22
23use arrow_schema::SchemaRef as ArrowSchemaRef;
24use common_catalog::consts::PG_CATALOG_PG_NAMESPACE_TABLE_ID;
25use common_error::ext::BoxedError;
26use common_recordbatch::adapter::RecordBatchStreamAdapter;
27use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
28use datafusion::execution::TaskContext;
29use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
30use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
31use datatypes::scalars::ScalarVectorBuilder;
32use datatypes::schema::{Schema, SchemaRef};
33use datatypes::value::Value;
34use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
35use snafu::{OptionExt, ResultExt};
36use store_api::storage::ScanRequest;
37
38use crate::error::{
39    CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
40};
41use crate::information_schema::Predicates;
42use crate::system_schema::pg_catalog::{
43    query_ctx, PGNamespaceOidMapRef, OID_COLUMN_NAME, PG_NAMESPACE,
44};
45use crate::system_schema::utils::tables::{string_column, u32_column};
46use crate::system_schema::SystemTable;
47use crate::CatalogManager;
48
49const NSPNAME: &str = "nspname";
50const INIT_CAPACITY: usize = 42;
51
52pub(super) struct PGNamespace {
53    schema: SchemaRef,
54    catalog_name: String,
55    catalog_manager: Weak<dyn CatalogManager>,
56
57    // Workaround to convert schema_name to a numeric id
58    oid_map: PGNamespaceOidMapRef,
59}
60
61impl PGNamespace {
62    pub(super) fn new(
63        catalog_name: String,
64        catalog_manager: Weak<dyn CatalogManager>,
65        oid_map: PGNamespaceOidMapRef,
66    ) -> Self {
67        Self {
68            schema: Self::schema(),
69            catalog_name,
70            catalog_manager,
71            oid_map,
72        }
73    }
74
75    fn schema() -> SchemaRef {
76        Arc::new(Schema::new(vec![
77            // TODO(J0HN50N133): we do not have a numeric schema id, use schema name as a workaround. Use a proper schema id once we have it.
78            u32_column(OID_COLUMN_NAME),
79            string_column(NSPNAME),
80        ]))
81    }
82
83    fn builder(&self) -> PGNamespaceBuilder {
84        PGNamespaceBuilder::new(
85            self.schema.clone(),
86            self.catalog_name.clone(),
87            self.catalog_manager.clone(),
88            self.oid_map.clone(),
89        )
90    }
91}
92
93impl fmt::Debug for PGNamespace {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        f.debug_struct("PGNamespace")
96            .field("schema", &self.schema)
97            .field("catalog_name", &self.catalog_name)
98            .finish()
99    }
100}
101
102impl SystemTable for PGNamespace {
103    fn schema(&self) -> SchemaRef {
104        self.schema.clone()
105    }
106
107    fn table_id(&self) -> table::metadata::TableId {
108        PG_CATALOG_PG_NAMESPACE_TABLE_ID
109    }
110
111    fn table_name(&self) -> &'static str {
112        PG_NAMESPACE
113    }
114
115    fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
116        let schema = self.schema.arrow_schema().clone();
117        let mut builder = self.builder();
118        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
119            schema,
120            futures::stream::once(async move {
121                builder
122                    .make_namespace(Some(request))
123                    .await
124                    .map(|x| x.into_df_record_batch())
125                    .map_err(Into::into)
126            }),
127        ));
128        Ok(Box::pin(
129            RecordBatchStreamAdapter::try_new(stream)
130                .map_err(BoxedError::new)
131                .context(InternalSnafu)?,
132        ))
133    }
134}
135
136impl DfPartitionStream for PGNamespace {
137    fn schema(&self) -> &ArrowSchemaRef {
138        self.schema.arrow_schema()
139    }
140
141    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
142        let schema = self.schema.arrow_schema().clone();
143        let mut builder = self.builder();
144        Box::pin(DfRecordBatchStreamAdapter::new(
145            schema,
146            futures::stream::once(async move {
147                builder
148                    .make_namespace(None)
149                    .await
150                    .map(|x| x.into_df_record_batch())
151                    .map_err(Into::into)
152            }),
153        ))
154    }
155}
156
157/// Builds the `pg_catalog.pg_namespace` table row by row
158/// `oid` use schema name as a workaround since we don't have numeric schema id.
159/// `nspname` is the schema name.
160struct PGNamespaceBuilder {
161    schema: SchemaRef,
162    catalog_name: String,
163    catalog_manager: Weak<dyn CatalogManager>,
164    namespace_oid_map: PGNamespaceOidMapRef,
165
166    oid: UInt32VectorBuilder,
167    nspname: StringVectorBuilder,
168}
169
170impl PGNamespaceBuilder {
171    fn new(
172        schema: SchemaRef,
173        catalog_name: String,
174        catalog_manager: Weak<dyn CatalogManager>,
175        namespace_oid_map: PGNamespaceOidMapRef,
176    ) -> Self {
177        Self {
178            schema,
179            catalog_name,
180            catalog_manager,
181            namespace_oid_map,
182            oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
183            nspname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
184        }
185    }
186
187    /// Construct the `pg_catalog.pg_namespace` virtual table
188    async fn make_namespace(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
189        let catalog_name = self.catalog_name.clone();
190        let catalog_manager = self
191            .catalog_manager
192            .upgrade()
193            .context(UpgradeWeakCatalogManagerRefSnafu)?;
194        let predicates = Predicates::from_scan_request(&request);
195        for schema_name in catalog_manager
196            .schema_names(&catalog_name, query_ctx())
197            .await?
198        {
199            self.add_namespace(&predicates, &schema_name);
200        }
201        self.finish()
202    }
203    fn finish(&mut self) -> Result<RecordBatch> {
204        let columns: Vec<VectorRef> =
205            vec![Arc::new(self.oid.finish()), Arc::new(self.nspname.finish())];
206        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
207    }
208
209    fn add_namespace(&mut self, predicates: &Predicates, schema_name: &str) {
210        let oid = self.namespace_oid_map.get_oid(schema_name);
211        let row = [
212            (OID_COLUMN_NAME, &Value::from(oid)),
213            (NSPNAME, &Value::from(schema_name)),
214        ];
215        if !predicates.eval(&row) {
216            return;
217        }
218        self.oid.push(Some(oid));
219        self.nspname.push(Some(schema_name));
220    }
221}