catalog/system_schema/pg_catalog/
pg_database.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::PG_CATALOG_PG_DATABASE_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch};
22use datafusion::execution::TaskContext;
23use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
24use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
25use datatypes::scalars::ScalarVectorBuilder;
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
29use snafu::{OptionExt, ResultExt};
30use store_api::storage::ScanRequest;
31
32use crate::error::{
33    CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
34};
35use crate::information_schema::Predicates;
36use crate::system_schema::pg_catalog::pg_namespace::oid_map::PGNamespaceOidMapRef;
37use crate::system_schema::pg_catalog::{query_ctx, OID_COLUMN_NAME, PG_DATABASE};
38use crate::system_schema::utils::tables::{string_column, u32_column};
39use crate::system_schema::SystemTable;
40use crate::CatalogManager;
41
42// === column name ===
43pub const DATNAME: &str = "datname";
44
45/// The initial capacity of the vector builders.
46const INIT_CAPACITY: usize = 42;
47
48/// The `pg_catalog.database` table implementation.
49pub(super) struct PGDatabase {
50    schema: SchemaRef,
51    catalog_name: String,
52    catalog_manager: Weak<dyn CatalogManager>,
53
54    // Workaround to convert schema_name to a numeric id
55    namespace_oid_map: PGNamespaceOidMapRef,
56}
57
58impl std::fmt::Debug for PGDatabase {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("PGDatabase")
61            .field("schema", &self.schema)
62            .field("catalog_name", &self.catalog_name)
63            .finish()
64    }
65}
66
67impl PGDatabase {
68    pub(super) fn new(
69        catalog_name: String,
70        catalog_manager: Weak<dyn CatalogManager>,
71        namespace_oid_map: PGNamespaceOidMapRef,
72    ) -> Self {
73        Self {
74            schema: Self::schema(),
75            catalog_name,
76            catalog_manager,
77            namespace_oid_map,
78        }
79    }
80
81    fn schema() -> SchemaRef {
82        Arc::new(Schema::new(vec![
83            u32_column(OID_COLUMN_NAME),
84            string_column(DATNAME),
85        ]))
86    }
87
88    fn builder(&self) -> PGCDatabaseBuilder {
89        PGCDatabaseBuilder::new(
90            self.schema.clone(),
91            self.catalog_name.clone(),
92            self.catalog_manager.clone(),
93            self.namespace_oid_map.clone(),
94        )
95    }
96}
97
98impl DfPartitionStream for PGDatabase {
99    fn schema(&self) -> &ArrowSchemaRef {
100        self.schema.arrow_schema()
101    }
102
103    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
104        let schema = self.schema.arrow_schema().clone();
105        let mut builder = self.builder();
106        Box::pin(DfRecordBatchStreamAdapter::new(
107            schema,
108            futures::stream::once(async move {
109                builder
110                    .make_database(None)
111                    .await
112                    .map(|x| x.into_df_record_batch())
113                    .map_err(Into::into)
114            }),
115        ))
116    }
117}
118
119impl SystemTable for PGDatabase {
120    fn table_id(&self) -> table::metadata::TableId {
121        PG_CATALOG_PG_DATABASE_TABLE_ID
122    }
123
124    fn table_name(&self) -> &'static str {
125        PG_DATABASE
126    }
127
128    fn schema(&self) -> SchemaRef {
129        self.schema.clone()
130    }
131
132    fn to_stream(
133        &self,
134        request: ScanRequest,
135    ) -> Result<common_recordbatch::SendableRecordBatchStream> {
136        let schema = self.schema.arrow_schema().clone();
137        let mut builder = self.builder();
138        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
139            schema,
140            futures::stream::once(async move {
141                builder
142                    .make_database(Some(request))
143                    .await
144                    .map(|x| x.into_df_record_batch())
145                    .map_err(Into::into)
146            }),
147        ));
148        Ok(Box::pin(
149            RecordBatchStreamAdapter::try_new(stream)
150                .map_err(BoxedError::new)
151                .context(InternalSnafu)?,
152        ))
153    }
154}
155
156/// Builds the `pg_catalog.pg_database` table row by row
157/// `oid` use schema name as a workaround since we don't have numeric schema id.
158/// `nspname` is the schema name.
159struct PGCDatabaseBuilder {
160    schema: SchemaRef,
161    catalog_name: String,
162    catalog_manager: Weak<dyn CatalogManager>,
163    namespace_oid_map: PGNamespaceOidMapRef,
164
165    oid: UInt32VectorBuilder,
166    datname: StringVectorBuilder,
167}
168
169impl PGCDatabaseBuilder {
170    fn new(
171        schema: SchemaRef,
172        catalog_name: String,
173        catalog_manager: Weak<dyn CatalogManager>,
174        namespace_oid_map: PGNamespaceOidMapRef,
175    ) -> Self {
176        Self {
177            schema,
178            catalog_name,
179            catalog_manager,
180            namespace_oid_map,
181
182            oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
183            datname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
184        }
185    }
186
187    async fn make_database(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
188        let catalog_name = self.catalog_name.clone();
189        let catalog_manager = self
190            .catalog_manager
191            .upgrade()
192            .context(UpgradeWeakCatalogManagerRefSnafu)?;
193        let predicates = Predicates::from_scan_request(&request);
194        for schema_name in catalog_manager
195            .schema_names(&catalog_name, query_ctx())
196            .await?
197        {
198            self.add_database(&predicates, &schema_name);
199        }
200        self.finish()
201    }
202
203    fn add_database(&mut self, predicates: &Predicates, schema_name: &str) {
204        let oid = self.namespace_oid_map.get_oid(schema_name);
205        let row: [(&str, &Value); 2] = [
206            (OID_COLUMN_NAME, &Value::from(oid)),
207            (DATNAME, &Value::from(schema_name)),
208        ];
209
210        if !predicates.eval(&row) {
211            return;
212        }
213
214        self.oid.push(Some(oid));
215        self.datname.push(Some(schema_name));
216    }
217
218    fn finish(&mut self) -> Result<RecordBatch> {
219        let columns: Vec<VectorRef> =
220            vec![Arc::new(self.oid.finish()), Arc::new(self.datname.finish())];
221        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
222    }
223}