catalog/system_schema/pg_catalog/
pg_class.rs1use std::fmt;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef as ArrowSchemaRef;
19use common_catalog::consts::PG_CATALOG_PG_CLASS_TABLE_ID;
20use common_error::ext::BoxedError;
21use common_recordbatch::adapter::RecordBatchStreamAdapter;
22use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch};
23use datafusion::execution::TaskContext;
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
25use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
26use datatypes::scalars::ScalarVectorBuilder;
27use datatypes::schema::{Schema, SchemaRef};
28use datatypes::value::Value;
29use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
30use futures::TryStreamExt;
31use snafu::{OptionExt, ResultExt};
32use store_api::storage::ScanRequest;
33use table::metadata::TableType;
34
35use crate::error::{
36 CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
37};
38use crate::information_schema::Predicates;
39use crate::system_schema::pg_catalog::pg_namespace::oid_map::PGNamespaceOidMapRef;
40use crate::system_schema::pg_catalog::{query_ctx, OID_COLUMN_NAME, PG_CLASS};
41use crate::system_schema::utils::tables::{string_column, u32_column};
42use crate::system_schema::SystemTable;
43use crate::CatalogManager;
44
45pub const RELNAME: &str = "relname";
47pub const RELNAMESPACE: &str = "relnamespace";
48pub const RELKIND: &str = "relkind";
49pub const RELOWNER: &str = "relowner";
50
51pub const RELKIND_TABLE: &str = "r";
53pub const RELKIND_VIEW: &str = "v";
54
55const INIT_CAPACITY: usize = 42;
57const DUMMY_OWNER_ID: u32 = 0;
59
60pub(super) struct PGClass {
62 schema: SchemaRef,
63 catalog_name: String,
64 catalog_manager: Weak<dyn CatalogManager>,
65
66 namespace_oid_map: PGNamespaceOidMapRef,
68}
69
70impl PGClass {
71 pub(super) fn new(
72 catalog_name: String,
73 catalog_manager: Weak<dyn CatalogManager>,
74 namespace_oid_map: PGNamespaceOidMapRef,
75 ) -> Self {
76 Self {
77 schema: Self::schema(),
78 catalog_name,
79 catalog_manager,
80 namespace_oid_map,
81 }
82 }
83
84 fn schema() -> SchemaRef {
85 Arc::new(Schema::new(vec![
86 u32_column(OID_COLUMN_NAME),
87 string_column(RELNAME),
88 u32_column(RELNAMESPACE),
89 string_column(RELKIND),
90 u32_column(RELOWNER),
91 ]))
92 }
93
94 fn builder(&self) -> PGClassBuilder {
95 PGClassBuilder::new(
96 self.schema.clone(),
97 self.catalog_name.clone(),
98 self.catalog_manager.clone(),
99 self.namespace_oid_map.clone(),
100 )
101 }
102}
103
104impl fmt::Debug for PGClass {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 f.debug_struct("PGClass")
107 .field("schema", &self.schema)
108 .field("catalog_name", &self.catalog_name)
109 .finish()
110 }
111}
112
113impl SystemTable for PGClass {
114 fn table_id(&self) -> table::metadata::TableId {
115 PG_CATALOG_PG_CLASS_TABLE_ID
116 }
117
118 fn table_name(&self) -> &'static str {
119 PG_CLASS
120 }
121
122 fn schema(&self) -> SchemaRef {
123 self.schema.clone()
124 }
125
126 fn to_stream(
127 &self,
128 request: ScanRequest,
129 ) -> Result<common_recordbatch::SendableRecordBatchStream> {
130 let schema = self.schema.arrow_schema().clone();
131 let mut builder = self.builder();
132 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
133 schema,
134 futures::stream::once(async move {
135 builder
136 .make_class(Some(request))
137 .await
138 .map(|x| x.into_df_record_batch())
139 .map_err(Into::into)
140 }),
141 ));
142 Ok(Box::pin(
143 RecordBatchStreamAdapter::try_new(stream)
144 .map_err(BoxedError::new)
145 .context(InternalSnafu)?,
146 ))
147 }
148}
149
150impl DfPartitionStream for PGClass {
151 fn schema(&self) -> &ArrowSchemaRef {
152 self.schema.arrow_schema()
153 }
154
155 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
156 let schema = self.schema.arrow_schema().clone();
157 let mut builder = self.builder();
158 Box::pin(DfRecordBatchStreamAdapter::new(
159 schema,
160 futures::stream::once(async move {
161 builder
162 .make_class(None)
163 .await
164 .map(|x| x.into_df_record_batch())
165 .map_err(Into::into)
166 }),
167 ))
168 }
169}
170
171struct PGClassBuilder {
175 schema: SchemaRef,
176 catalog_name: String,
177 catalog_manager: Weak<dyn CatalogManager>,
178 namespace_oid_map: PGNamespaceOidMapRef,
179
180 oid: UInt32VectorBuilder,
181 relname: StringVectorBuilder,
182 relnamespace: UInt32VectorBuilder,
183 relkind: StringVectorBuilder,
184 relowner: UInt32VectorBuilder,
185}
186
187impl PGClassBuilder {
188 fn new(
189 schema: SchemaRef,
190 catalog_name: String,
191 catalog_manager: Weak<dyn CatalogManager>,
192 namespace_oid_map: PGNamespaceOidMapRef,
193 ) -> Self {
194 Self {
195 schema,
196 catalog_name,
197 catalog_manager,
198 namespace_oid_map,
199
200 oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
201 relname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
202 relnamespace: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
203 relkind: StringVectorBuilder::with_capacity(INIT_CAPACITY),
204 relowner: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
205 }
206 }
207
208 async fn make_class(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
209 let catalog_name = self.catalog_name.clone();
210 let catalog_manager = self
211 .catalog_manager
212 .upgrade()
213 .context(UpgradeWeakCatalogManagerRefSnafu)?;
214 let predicates = Predicates::from_scan_request(&request);
215 for schema_name in catalog_manager
216 .schema_names(&catalog_name, query_ctx())
217 .await?
218 {
219 let mut stream = catalog_manager.tables(&catalog_name, &schema_name, query_ctx());
220 while let Some(table) = stream.try_next().await? {
221 let table_info = table.table_info();
222 self.add_class(
223 &predicates,
224 table_info.table_id(),
225 &schema_name,
226 &table_info.name,
227 if table_info.table_type == TableType::View {
228 RELKIND_VIEW
229 } else {
230 RELKIND_TABLE
231 },
232 );
233 }
234 }
235 self.finish()
236 }
237
238 fn add_class(
239 &mut self,
240 predicates: &Predicates,
241 oid: u32,
242 schema: &str,
243 table: &str,
244 kind: &str,
245 ) {
246 let namespace_oid = self.namespace_oid_map.get_oid(schema);
247 let row = [
248 (OID_COLUMN_NAME, &Value::from(oid)),
249 (RELNAMESPACE, &Value::from(schema)),
250 (RELNAME, &Value::from(table)),
251 (RELKIND, &Value::from(kind)),
252 (RELOWNER, &Value::from(DUMMY_OWNER_ID)),
253 ];
254
255 if !predicates.eval(&row) {
256 return;
257 }
258
259 self.oid.push(Some(oid));
260 self.relnamespace.push(Some(namespace_oid));
261 self.relname.push(Some(table));
262 self.relkind.push(Some(kind));
263 self.relowner.push(Some(DUMMY_OWNER_ID));
264 }
265
266 fn finish(&mut self) -> Result<RecordBatch> {
267 let columns: Vec<VectorRef> = vec![
268 Arc::new(self.oid.finish()),
269 Arc::new(self.relname.finish()),
270 Arc::new(self.relnamespace.finish()),
271 Arc::new(self.relkind.finish()),
272 Arc::new(self.relowner.finish()),
273 ];
274 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
275 }
276}