catalog/system_schema/pg_catalog/
pg_database.rs1use std::sync::{Arc, Weak};
16
17use arrow_schema::SchemaRef as ArrowSchemaRef;
18use common_catalog::consts::PG_CATALOG_PG_DATABASE_TABLE_ID;
19use common_error::ext::BoxedError;
20use common_recordbatch::adapter::RecordBatchStreamAdapter;
21use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch};
22use datafusion::execution::TaskContext;
23use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
24use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
25use datatypes::scalars::ScalarVectorBuilder;
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
29use snafu::{OptionExt, ResultExt};
30use store_api::storage::ScanRequest;
31
32use crate::error::{
33 CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
34};
35use crate::information_schema::Predicates;
36use crate::system_schema::pg_catalog::pg_namespace::oid_map::PGNamespaceOidMapRef;
37use crate::system_schema::pg_catalog::{query_ctx, OID_COLUMN_NAME, PG_DATABASE};
38use crate::system_schema::utils::tables::{string_column, u32_column};
39use crate::system_schema::SystemTable;
40use crate::CatalogManager;
41
42pub const DATNAME: &str = "datname";
44
45const INIT_CAPACITY: usize = 42;
47
48pub(super) struct PGDatabase {
50 schema: SchemaRef,
51 catalog_name: String,
52 catalog_manager: Weak<dyn CatalogManager>,
53
54 namespace_oid_map: PGNamespaceOidMapRef,
56}
57
58impl std::fmt::Debug for PGDatabase {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("PGDatabase")
61 .field("schema", &self.schema)
62 .field("catalog_name", &self.catalog_name)
63 .finish()
64 }
65}
66
67impl PGDatabase {
68 pub(super) fn new(
69 catalog_name: String,
70 catalog_manager: Weak<dyn CatalogManager>,
71 namespace_oid_map: PGNamespaceOidMapRef,
72 ) -> Self {
73 Self {
74 schema: Self::schema(),
75 catalog_name,
76 catalog_manager,
77 namespace_oid_map,
78 }
79 }
80
81 fn schema() -> SchemaRef {
82 Arc::new(Schema::new(vec![
83 u32_column(OID_COLUMN_NAME),
84 string_column(DATNAME),
85 ]))
86 }
87
88 fn builder(&self) -> PGCDatabaseBuilder {
89 PGCDatabaseBuilder::new(
90 self.schema.clone(),
91 self.catalog_name.clone(),
92 self.catalog_manager.clone(),
93 self.namespace_oid_map.clone(),
94 )
95 }
96}
97
98impl DfPartitionStream for PGDatabase {
99 fn schema(&self) -> &ArrowSchemaRef {
100 self.schema.arrow_schema()
101 }
102
103 fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
104 let schema = self.schema.arrow_schema().clone();
105 let mut builder = self.builder();
106 Box::pin(DfRecordBatchStreamAdapter::new(
107 schema,
108 futures::stream::once(async move {
109 builder
110 .make_database(None)
111 .await
112 .map(|x| x.into_df_record_batch())
113 .map_err(Into::into)
114 }),
115 ))
116 }
117}
118
119impl SystemTable for PGDatabase {
120 fn table_id(&self) -> table::metadata::TableId {
121 PG_CATALOG_PG_DATABASE_TABLE_ID
122 }
123
124 fn table_name(&self) -> &'static str {
125 PG_DATABASE
126 }
127
128 fn schema(&self) -> SchemaRef {
129 self.schema.clone()
130 }
131
132 fn to_stream(
133 &self,
134 request: ScanRequest,
135 ) -> Result<common_recordbatch::SendableRecordBatchStream> {
136 let schema = self.schema.arrow_schema().clone();
137 let mut builder = self.builder();
138 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
139 schema,
140 futures::stream::once(async move {
141 builder
142 .make_database(Some(request))
143 .await
144 .map(|x| x.into_df_record_batch())
145 .map_err(Into::into)
146 }),
147 ));
148 Ok(Box::pin(
149 RecordBatchStreamAdapter::try_new(stream)
150 .map_err(BoxedError::new)
151 .context(InternalSnafu)?,
152 ))
153 }
154}
155
156struct PGCDatabaseBuilder {
160 schema: SchemaRef,
161 catalog_name: String,
162 catalog_manager: Weak<dyn CatalogManager>,
163 namespace_oid_map: PGNamespaceOidMapRef,
164
165 oid: UInt32VectorBuilder,
166 datname: StringVectorBuilder,
167}
168
169impl PGCDatabaseBuilder {
170 fn new(
171 schema: SchemaRef,
172 catalog_name: String,
173 catalog_manager: Weak<dyn CatalogManager>,
174 namespace_oid_map: PGNamespaceOidMapRef,
175 ) -> Self {
176 Self {
177 schema,
178 catalog_name,
179 catalog_manager,
180 namespace_oid_map,
181
182 oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
183 datname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
184 }
185 }
186
187 async fn make_database(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
188 let catalog_name = self.catalog_name.clone();
189 let catalog_manager = self
190 .catalog_manager
191 .upgrade()
192 .context(UpgradeWeakCatalogManagerRefSnafu)?;
193 let predicates = Predicates::from_scan_request(&request);
194 for schema_name in catalog_manager
195 .schema_names(&catalog_name, query_ctx())
196 .await?
197 {
198 self.add_database(&predicates, &schema_name);
199 }
200 self.finish()
201 }
202
203 fn add_database(&mut self, predicates: &Predicates, schema_name: &str) {
204 let oid = self.namespace_oid_map.get_oid(schema_name);
205 let row: [(&str, &Value); 2] = [
206 (OID_COLUMN_NAME, &Value::from(oid)),
207 (DATNAME, &Value::from(schema_name)),
208 ];
209
210 if !predicates.eval(&row) {
211 return;
212 }
213
214 self.oid.push(Some(oid));
215 self.datname.push(Some(schema_name));
216 }
217
218 fn finish(&mut self) -> Result<RecordBatch> {
219 let columns: Vec<VectorRef> =
220 vec![Arc::new(self.oid.finish()), Arc::new(self.datname.finish())];
221 RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
222 }
223}