catalog/system_schema/pg_catalog/
pg_namespace.rs1pub(super) mod oid_map;
19
20use std::fmt;
21use std::sync::{Arc, Weak};
22
23use arrow_schema::SchemaRef as ArrowSchemaRef;
24use common_catalog::consts::PG_CATALOG_PG_NAMESPACE_TABLE_ID;
25use common_error::ext::BoxedError;
26use common_recordbatch::adapter::RecordBatchStreamAdapter;
27use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
28use datafusion::execution::TaskContext;
29use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
30use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
31use datatypes::scalars::ScalarVectorBuilder;
32use datatypes::schema::{Schema, SchemaRef};
33use datatypes::value::Value;
34use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
35use snafu::{OptionExt, ResultExt};
36use store_api::storage::ScanRequest;
37
38use crate::error::{
39 CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
40};
41use crate::information_schema::Predicates;
42use crate::system_schema::pg_catalog::{
43 query_ctx, PGNamespaceOidMapRef, OID_COLUMN_NAME, PG_NAMESPACE,
44};
45use crate::system_schema::utils::tables::{string_column, u32_column};
46use crate::system_schema::SystemTable;
47use crate::CatalogManager;
48
49const NSPNAME: &str = "nspname";
50const INIT_CAPACITY: usize = 42;
51
52pub(super) struct PGNamespace {
53 schema: SchemaRef,
54 catalog_name: String,
55 catalog_manager: Weak<dyn CatalogManager>,
56
57 oid_map: PGNamespaceOidMapRef,
59}
60
61impl PGNamespace {
62 pub(super) fn new(
63 catalog_name: String,
64 catalog_manager: Weak<dyn CatalogManager>,
65 oid_map: PGNamespaceOidMapRef,
66 ) -> Self {
67 Self {
68 schema: Self::schema(),
69 catalog_name,
70 catalog_manager,
71 oid_map,
72 }
73 }
74
75 fn schema() -> SchemaRef {
76 Arc::new(Schema::new(vec![
77 u32_column(OID_COLUMN_NAME),
79 string_column(NSPNAME),
80 ]))
81 }
82
83 fn builder(&self) -> PGNamespaceBuilder {
84 PGNamespaceBuilder::new(
85 self.schema.clone(),
86 self.catalog_name.clone(),
87 self.catalog_manager.clone(),
88 self.oid_map.clone(),
89 )
90 }
91}
92
93impl fmt::Debug for PGNamespace {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 f.debug_struct("PGNamespace")
96 .field("schema", &self.schema)
97 .field("catalog_name", &self.catalog_name)
98 .finish()
99 }
100}
101
102impl SystemTable for PGNamespace {
103 fn schema(&self) -> SchemaRef {
104 self.schema.clone()
105 }
106
107 fn table_id(&self) -> table::metadata::TableId {
108 PG_CATALOG_PG_NAMESPACE_TABLE_ID
109 }
110
111 fn table_name(&self) -> &'static str {
112 PG_NAMESPACE
113 }
114
115 fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
116 let schema = self.schema.arrow_schema().clone();
117 let mut builder = self.builder();
118 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
119 schema,
120 futures::stream::once(async move {
121 builder
122 .make_namespace(Some(request))
123 .await
124 .map(|x| x.into_df_record_batch())
125 .map_err(Into::into)
126 }),
127 ));
128 Ok(Box::pin(
129 RecordBatchStreamAdapter::try_new(stream)
130 .map_err(BoxedError::new)
131 .context(InternalSnafu)?,
132 ))
133 }
134}
135
136impl DfPartitionStream for PGNamespace {
137 fn schema(&self) -> &ArrowSchemaRef {
138 self.schema.arrow_schema()
139 }
140
141 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
142 let schema = self.schema.arrow_schema().clone();
143 let mut builder = self.builder();
144 Box::pin(DfRecordBatchStreamAdapter::new(
145 schema,
146 futures::stream::once(async move {
147 builder
148 .make_namespace(None)
149 .await
150 .map(|x| x.into_df_record_batch())
151 .map_err(Into::into)
152 }),
153 ))
154 }
155}
156
157struct PGNamespaceBuilder {
161 schema: SchemaRef,
162 catalog_name: String,
163 catalog_manager: Weak<dyn CatalogManager>,
164 namespace_oid_map: PGNamespaceOidMapRef,
165
166 oid: UInt32VectorBuilder,
167 nspname: StringVectorBuilder,
168}
169
170impl PGNamespaceBuilder {
171 fn new(
172 schema: SchemaRef,
173 catalog_name: String,
174 catalog_manager: Weak<dyn CatalogManager>,
175 namespace_oid_map: PGNamespaceOidMapRef,
176 ) -> Self {
177 Self {
178 schema,
179 catalog_name,
180 catalog_manager,
181 namespace_oid_map,
182 oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
183 nspname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
184 }
185 }
186
187 async fn make_namespace(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
189 let catalog_name = self.catalog_name.clone();
190 let catalog_manager = self
191 .catalog_manager
192 .upgrade()
193 .context(UpgradeWeakCatalogManagerRefSnafu)?;
194 let predicates = Predicates::from_scan_request(&request);
195 for schema_name in catalog_manager
196 .schema_names(&catalog_name, query_ctx())
197 .await?
198 {
199 self.add_namespace(&predicates, &schema_name);
200 }
201 self.finish()
202 }
203 fn finish(&mut self) -> Result<RecordBatch> {
204 let columns: Vec<VectorRef> =
205 vec![Arc::new(self.oid.finish()), Arc::new(self.nspname.finish())];
206 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
207 }
208
209 fn add_namespace(&mut self, predicates: &Predicates, schema_name: &str) {
210 let oid = self.namespace_oid_map.get_oid(schema_name);
211 let row = [
212 (OID_COLUMN_NAME, &Value::from(oid)),
213 (NSPNAME, &Value::from(schema_name)),
214 ];
215 if !predicates.eval(&row) {
216 return;
217 }
218 self.oid.push(Some(oid));
219 self.nspname.push(Some(schema_name));
220 }
221}