1use std::collections::HashMap;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef;
19use async_trait::async_trait;
20use common_catalog::consts::{DEFAULT_CATALOG_NAME, PG_CATALOG_NAME, PG_CATALOG_TABLE_ID_START};
21use common_error::ext::BoxedError;
22use common_recordbatch::SendableRecordBatchStream;
23use common_recordbatch::adapter::RecordBatchStreamAdapter;
24use common_telemetry::warn;
25use datafusion::datasource::TableType;
26use datafusion::error::DataFusionError;
27use datafusion::execution::TaskContext;
28use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
29use datafusion_pg_catalog::pg_catalog::catalog_info::CatalogInfo;
30use datafusion_pg_catalog::pg_catalog::{
31 PG_CATALOG_TABLES, PgCatalogSchemaProvider, PgCatalogStaticTables, PgCatalogTable,
32};
33use snafu::ResultExt;
34use store_api::storage::ScanRequest;
35use table::TableRef;
36use table::metadata::TableId;
37
38use crate::CatalogManager;
39use crate::error::{InternalSnafu, ProjectSchemaSnafu, Result};
40use crate::system_schema::{
41 SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
42};
43
44pub struct PGCatalogProvider {
46 catalog_name: String,
47 inner: PgCatalogSchemaProvider<CatalogManagerWrapper>,
48 tables: HashMap<String, TableRef>,
49 table_ids: HashMap<&'static str, u32>,
50}
51
52impl SystemSchemaProvider for PGCatalogProvider {
53 fn tables(&self) -> &HashMap<String, TableRef> {
54 assert!(!self.tables.is_empty());
55
56 &self.tables
57 }
58}
59
60impl PGCatalogProvider {
61 pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
62 let static_tables =
65 PgCatalogStaticTables::try_new().expect("Failed to initialize static tables");
66 let inner = PgCatalogSchemaProvider::try_new(
67 CatalogManagerWrapper {
68 catalog_name: catalog_name.clone(),
69 catalog_manager,
70 },
71 Arc::new(static_tables),
72 )
73 .expect("Failed to initialize PgCatalogSchemaProvider");
74
75 let mut table_ids = HashMap::new();
76 let mut table_id = PG_CATALOG_TABLE_ID_START;
77 for name in PG_CATALOG_TABLES {
78 table_ids.insert(*name, table_id);
79 table_id += 1;
80 }
81
82 let mut provider = Self {
83 catalog_name,
84 inner,
85 tables: HashMap::new(),
86 table_ids,
87 };
88 provider.build_tables();
89 provider
90 }
91
92 fn build_tables(&mut self) {
93 let mut tables = HashMap::new();
96 for name in PG_CATALOG_TABLES {
98 if let Some(table) = self.build_table(name) {
99 tables.insert(name.to_string(), table);
100 }
101 }
102
103 self.tables = tables;
104 }
105}
106
107impl SystemSchemaProviderInner for PGCatalogProvider {
108 fn schema_name() -> &'static str {
109 PG_CATALOG_NAME
110 }
111
112 fn system_table(&self, name: &str) -> Option<SystemTableRef> {
113 if let Some((table_name, table_id)) = self.table_ids.get_key_value(name) {
114 let table = self.inner.build_table_by_name(name).expect(name);
115
116 if let Some(table) = table {
117 if let Ok(system_table) = DFTableProviderAsSystemTable::try_new(
118 *table_id,
119 table_name,
120 table::metadata::TableType::Temporary,
121 table,
122 ) {
123 Some(Arc::new(system_table))
124 } else {
125 warn!("failed to create pg_catalog system table {}", name);
126 None
127 }
128 } else {
129 None
130 }
131 } else {
132 None
133 }
134 }
135
136 fn catalog_name(&self) -> &str {
137 &self.catalog_name
138 }
139}
140
141#[derive(Clone)]
142pub struct CatalogManagerWrapper {
143 catalog_name: String,
144 catalog_manager: Weak<dyn CatalogManager>,
145}
146
147impl CatalogManagerWrapper {
148 fn catalog_manager(&self) -> std::result::Result<Arc<dyn CatalogManager>, DataFusionError> {
149 self.catalog_manager.upgrade().ok_or_else(|| {
150 DataFusionError::Internal("Failed to access catalog manager".to_string())
151 })
152 }
153}
154
155impl std::fmt::Debug for CatalogManagerWrapper {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("CatalogManagerWrapper").finish()
158 }
159}
160
161#[async_trait]
162impl CatalogInfo for CatalogManagerWrapper {
163 async fn catalog_names(&self) -> std::result::Result<Vec<String>, DataFusionError> {
164 if self.catalog_name == DEFAULT_CATALOG_NAME {
165 CatalogManager::catalog_names(self.catalog_manager()?.as_ref())
166 .await
167 .map_err(|e| DataFusionError::External(Box::new(e)))
168 } else {
169 Ok(vec![self.catalog_name.clone()])
170 }
171 }
172
173 async fn schema_names(
174 &self,
175 catalog_name: &str,
176 ) -> std::result::Result<Option<Vec<String>>, DataFusionError> {
177 self.catalog_manager()?
178 .schema_names(catalog_name, None)
179 .await
180 .map(Some)
181 .map_err(|e| DataFusionError::External(Box::new(e)))
182 }
183
184 async fn table_names(
185 &self,
186 catalog_name: &str,
187 schema_name: &str,
188 ) -> std::result::Result<Option<Vec<String>>, DataFusionError> {
189 self.catalog_manager()?
190 .table_names(catalog_name, schema_name, None)
191 .await
192 .map(Some)
193 .map_err(|e| DataFusionError::External(Box::new(e)))
194 }
195
196 async fn table_schema(
197 &self,
198 catalog_name: &str,
199 schema_name: &str,
200 table_name: &str,
201 ) -> std::result::Result<Option<SchemaRef>, DataFusionError> {
202 let table = self
203 .catalog_manager()?
204 .table(catalog_name, schema_name, table_name, None)
205 .await
206 .map_err(|e| DataFusionError::External(Box::new(e)))?;
207
208 Ok(table.map(|t| t.schema().arrow_schema().clone()))
209 }
210
211 async fn table_type(
212 &self,
213 catalog_name: &str,
214 schema_name: &str,
215 table_name: &str,
216 ) -> std::result::Result<Option<TableType>, DataFusionError> {
217 let table = self
218 .catalog_manager()?
219 .table(catalog_name, schema_name, table_name, None)
220 .await
221 .map_err(|e| DataFusionError::External(Box::new(e)))?;
222
223 Ok(table.map(|t| t.table_type().into()))
224 }
225}
226
227struct DFTableProviderAsSystemTable {
228 pub table_id: TableId,
229 pub table_name: &'static str,
230 pub table_type: table::metadata::TableType,
231 pub schema: Arc<datatypes::schema::Schema>,
232 pub table_provider: PgCatalogTable,
233}
234
235impl DFTableProviderAsSystemTable {
236 pub fn try_new(
237 table_id: TableId,
238 table_name: &'static str,
239 table_type: table::metadata::TableType,
240 table_provider: PgCatalogTable,
241 ) -> Result<Self> {
242 let arrow_schema = table_provider.schema();
243 let schema = Arc::new(arrow_schema.try_into().context(ProjectSchemaSnafu)?);
244 Ok(Self {
245 table_id,
246 table_name,
247 table_type,
248 schema,
249 table_provider,
250 })
251 }
252}
253
254impl SystemTable for DFTableProviderAsSystemTable {
255 fn table_id(&self) -> TableId {
256 self.table_id
257 }
258
259 fn table_name(&self) -> &'static str {
260 self.table_name
261 }
262
263 fn schema(&self) -> Arc<datatypes::schema::Schema> {
264 self.schema.clone()
265 }
266
267 fn table_type(&self) -> table::metadata::TableType {
268 self.table_type
269 }
270
271 fn to_stream(&self, _request: ScanRequest) -> Result<SendableRecordBatchStream> {
272 match &self.table_provider {
273 PgCatalogTable::Static(table) => {
274 let schema = self.schema.arrow_schema().clone();
275 let data = table
276 .data()
277 .iter()
278 .map(|rb| Ok(rb.clone()))
279 .collect::<Vec<_>>();
280 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
281 schema,
282 futures::stream::iter(data),
283 ));
284 Ok(Box::pin(
285 RecordBatchStreamAdapter::try_new(stream)
286 .map_err(BoxedError::new)
287 .context(InternalSnafu)?,
288 ))
289 }
290
291 PgCatalogTable::Dynamic(table) => {
292 let stream = table.execute(Arc::new(TaskContext::default()));
293 Ok(Box::pin(
294 RecordBatchStreamAdapter::try_new(stream)
295 .map_err(BoxedError::new)
296 .context(InternalSnafu)?,
297 ))
298 }
299
300 PgCatalogTable::Empty(_) => {
301 let schema = self.schema.arrow_schema().clone();
302 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
303 schema,
304 futures::stream::iter(vec![]),
305 ));
306 Ok(Box::pin(
307 RecordBatchStreamAdapter::try_new(stream)
308 .map_err(BoxedError::new)
309 .context(InternalSnafu)?,
310 ))
311 }
312 }
313 }
314}