catalog/system_schema/pg_catalog/
pg_class.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::PG_CATALOG_PG_CLASS_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datatypes::scalars::ScalarVectorBuilder;
27use datatypes::schema::{Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
30use futures::TryStreamExt;
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::ScanRequest;
33use table::metadata::TableType;
34
35use crate::error::{
36    CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
37};
38use crate::information_schema::Predicates;
39use crate::system_schema::pg_catalog::pg_namespace::oid_map::PGNamespaceOidMapRef;
40use crate::system_schema::pg_catalog::{query_ctx, OID_COLUMN_NAME, PG_CLASS};
41use crate::system_schema::utils::tables::{string_column, u32_column};
42use crate::system_schema::SystemTable;
43use crate::CatalogManager;
44
45// === column name ===
46pub const RELNAME: &str = "relname";
47pub const RELNAMESPACE: &str = "relnamespace";
48pub const RELKIND: &str = "relkind";
49pub const RELOWNER: &str = "relowner";
50
51// === enum value of relkind ===
52pub const RELKIND_TABLE: &str = "r";
53pub const RELKIND_VIEW: &str = "v";
54
55/// The initial capacity of the vector builders.
56const INIT_CAPACITY: usize = 42;
57/// The dummy owner id for the namespace.
58const DUMMY_OWNER_ID: u32 = 0;
59
60/// The `pg_catalog.pg_class` table implementation.
61pub(super) struct PGClass {
62    schema: SchemaRef,
63    catalog_name: String,
64    catalog_manager: Weak<dyn CatalogManager>,
65
66    // Workaround to convert schema_name to a numeric id
67    namespace_oid_map: PGNamespaceOidMapRef,
68}
69
70impl PGClass {
71    pub(super) fn new(
72        catalog_name: String,
73        catalog_manager: Weak<dyn CatalogManager>,
74        namespace_oid_map: PGNamespaceOidMapRef,
75    ) -> Self {
76        Self {
77            schema: Self::schema(),
78            catalog_name,
79            catalog_manager,
80            namespace_oid_map,
81        }
82    }
83
84    fn schema() -> SchemaRef {
85        Arc::new(Schema::new(vec![
86            u32_column(OID_COLUMN_NAME),
87            string_column(RELNAME),
88            u32_column(RELNAMESPACE),
89            string_column(RELKIND),
90            u32_column(RELOWNER),
91        ]))
92    }
93
94    fn builder(&self) -> PGClassBuilder {
95        PGClassBuilder::new(
96            self.schema.clone(),
97            self.catalog_name.clone(),
98            self.catalog_manager.clone(),
99            self.namespace_oid_map.clone(),
100        )
101    }
102}
103
104impl fmt::Debug for PGClass {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        f.debug_struct("PGClass")
107            .field("schema", &self.schema)
108            .field("catalog_name", &self.catalog_name)
109            .finish()
110    }
111}
112
113impl SystemTable for PGClass {
114    fn table_id(&self) -> table::metadata::TableId {
115        PG_CATALOG_PG_CLASS_TABLE_ID
116    }
117
118    fn table_name(&self) -> &'static str {
119        PG_CLASS
120    }
121
122    fn schema(&self) -> SchemaRef {
123        self.schema.clone()
124    }
125
126    fn to_stream(
127        &self,
128        request: ScanRequest,
129    ) -> Result<common_recordbatch::SendableRecordBatchStream> {
130        let schema = self.schema.arrow_schema().clone();
131        let mut builder = self.builder();
132        let stream = Box::pin(DfRecordBatchStreamAdapter::new(
133            schema,
134            futures::stream::once(async move {
135                builder
136                    .make_class(Some(request))
137                    .await
138                    .map(|x| x.into_df_record_batch())
139                    .map_err(Into::into)
140            }),
141        ));
142        Ok(Box::pin(
143            RecordBatchStreamAdapter::try_new(stream)
144                .map_err(BoxedError::new)
145                .context(InternalSnafu)?,
146        ))
147    }
148}
149
150impl DfPartitionStream for PGClass {
151    fn schema(&self) -> &ArrowSchemaRef {
152        self.schema.arrow_schema()
153    }
154
155    fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
156        let schema = self.schema.arrow_schema().clone();
157        let mut builder = self.builder();
158        Box::pin(DfRecordBatchStreamAdapter::new(
159            schema,
160            futures::stream::once(async move {
161                builder
162                    .make_class(None)
163                    .await
164                    .map(|x| x.into_df_record_batch())
165                    .map_err(Into::into)
166            }),
167        ))
168    }
169}
170
171/// Builds the `pg_catalog.pg_class` table row by row
172/// TODO(J0HN50N133): `relowner` is always the [`DUMMY_OWNER_ID`] cuz we don't have user.
173/// Once we have user system, make it the actual owner of the table.
174struct PGClassBuilder {
175    schema: SchemaRef,
176    catalog_name: String,
177    catalog_manager: Weak<dyn CatalogManager>,
178    namespace_oid_map: PGNamespaceOidMapRef,
179
180    oid: UInt32VectorBuilder,
181    relname: StringVectorBuilder,
182    relnamespace: UInt32VectorBuilder,
183    relkind: StringVectorBuilder,
184    relowner: UInt32VectorBuilder,
185}
186
187impl PGClassBuilder {
188    fn new(
189        schema: SchemaRef,
190        catalog_name: String,
191        catalog_manager: Weak<dyn CatalogManager>,
192        namespace_oid_map: PGNamespaceOidMapRef,
193    ) -> Self {
194        Self {
195            schema,
196            catalog_name,
197            catalog_manager,
198            namespace_oid_map,
199
200            oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
201            relname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
202            relnamespace: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
203            relkind: StringVectorBuilder::with_capacity(INIT_CAPACITY),
204            relowner: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
205        }
206    }
207
208    async fn make_class(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
209        let catalog_name = self.catalog_name.clone();
210        let catalog_manager = self
211            .catalog_manager
212            .upgrade()
213            .context(UpgradeWeakCatalogManagerRefSnafu)?;
214        let predicates = Predicates::from_scan_request(&request);
215        for schema_name in catalog_manager
216            .schema_names(&catalog_name, query_ctx())
217            .await?
218        {
219            let mut stream = catalog_manager.tables(&catalog_name, &schema_name, query_ctx());
220            while let Some(table) = stream.try_next().await? {
221                let table_info = table.table_info();
222                self.add_class(
223                    &predicates,
224                    table_info.table_id(),
225                    &schema_name,
226                    &table_info.name,
227                    if table_info.table_type == TableType::View {
228                        RELKIND_VIEW
229                    } else {
230                        RELKIND_TABLE
231                    },
232                );
233            }
234        }
235        self.finish()
236    }
237
238    fn add_class(
239        &mut self,
240        predicates: &Predicates,
241        oid: u32,
242        schema: &str,
243        table: &str,
244        kind: &str,
245    ) {
246        let namespace_oid = self.namespace_oid_map.get_oid(schema);
247        let row = [
248            (OID_COLUMN_NAME, &Value::from(oid)),
249            (RELNAMESPACE, &Value::from(schema)),
250            (RELNAME, &Value::from(table)),
251            (RELKIND, &Value::from(kind)),
252            (RELOWNER, &Value::from(DUMMY_OWNER_ID)),
253        ];
254
255        if !predicates.eval(&row) {
256            return;
257        }
258
259        self.oid.push(Some(oid));
260        self.relnamespace.push(Some(namespace_oid));
261        self.relname.push(Some(table));
262        self.relkind.push(Some(kind));
263        self.relowner.push(Some(DUMMY_OWNER_ID));
264    }
265
266    fn finish(&mut self) -> Result<RecordBatch> {
267        let columns: Vec<VectorRef> = vec![
268            Arc::new(self.oid.finish()),
269            Arc::new(self.relname.finish()),
270            Arc::new(self.relnamespace.finish()),
271            Arc::new(self.relkind.finish()),
272            Arc::new(self.relowner.finish()),
273        ];
274        RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
275    }
276}