1use std::collections::HashMap;
16use std::sync::{Arc, Weak};
17
18use arrow_schema::SchemaRef;
19use async_trait::async_trait;
20use common_catalog::consts::{DEFAULT_CATALOG_NAME, PG_CATALOG_NAME, PG_CATALOG_TABLE_ID_START};
21use common_error::ext::BoxedError;
22use common_recordbatch::SendableRecordBatchStream;
23use common_recordbatch::adapter::RecordBatchStreamAdapter;
24use common_telemetry::warn;
25use datafusion::datasource::TableType;
26use datafusion::error::DataFusionError;
27use datafusion::execution::TaskContext;
28use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
29use datafusion_pg_catalog::pg_catalog::catalog_info::CatalogInfo;
30use datafusion_pg_catalog::pg_catalog::context::EmptyContextProvider;
31use datafusion_pg_catalog::pg_catalog::{
32 PG_CATALOG_TABLES, PgCatalogSchemaProvider, PgCatalogStaticTables, PgCatalogTable,
33};
34use snafu::ResultExt;
35use store_api::storage::ScanRequest;
36use table::TableRef;
37use table::metadata::TableId;
38
39use crate::CatalogManager;
40use crate::error::{InternalSnafu, ProjectSchemaSnafu, Result};
41use crate::system_schema::{
42 SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
43};
44
45pub struct PGCatalogProvider {
47 catalog_name: String,
48 inner: PgCatalogSchemaProvider<CatalogManagerWrapper, EmptyContextProvider>,
49 tables: HashMap<String, TableRef>,
50 table_ids: HashMap<&'static str, u32>,
51}
52
53impl SystemSchemaProvider for PGCatalogProvider {
54 fn tables(&self) -> &HashMap<String, TableRef> {
55 assert!(!self.tables.is_empty());
56
57 &self.tables
58 }
59}
60
61impl PGCatalogProvider {
62 pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
63 let static_tables =
66 PgCatalogStaticTables::try_new().expect("Failed to initialize static tables");
67 let inner = PgCatalogSchemaProvider::try_new(
68 CatalogManagerWrapper {
69 catalog_name: catalog_name.clone(),
70 catalog_manager,
71 },
72 Arc::new(static_tables),
73 EmptyContextProvider,
74 )
75 .expect("Failed to initialize PgCatalogSchemaProvider");
76
77 let mut table_ids = HashMap::new();
78 let mut table_id = PG_CATALOG_TABLE_ID_START;
79 for name in PG_CATALOG_TABLES {
80 table_ids.insert(*name, table_id);
81 table_id += 1;
82 }
83
84 let mut provider = Self {
85 catalog_name,
86 inner,
87 tables: HashMap::new(),
88 table_ids,
89 };
90 provider.build_tables();
91 provider
92 }
93
94 fn build_tables(&mut self) {
95 let mut tables = HashMap::new();
98 for name in PG_CATALOG_TABLES {
100 if let Some(table) = self.build_table(name) {
101 tables.insert(name.to_string(), table);
102 }
103 }
104
105 self.tables = tables;
106 }
107}
108
109impl SystemSchemaProviderInner for PGCatalogProvider {
110 fn schema_name() -> &'static str {
111 PG_CATALOG_NAME
112 }
113
114 fn system_table(&self, name: &str) -> Option<SystemTableRef> {
115 if let Some((table_name, table_id)) = self.table_ids.get_key_value(name) {
116 let table = self.inner.build_table_by_name(name).expect(name);
117
118 if let Some(table) = table {
119 if let Ok(system_table) = DFTableProviderAsSystemTable::try_new(
120 *table_id,
121 table_name,
122 table::metadata::TableType::Temporary,
123 table,
124 ) {
125 Some(Arc::new(system_table))
126 } else {
127 warn!("failed to create pg_catalog system table {}", name);
128 None
129 }
130 } else {
131 None
132 }
133 } else {
134 None
135 }
136 }
137
138 fn catalog_name(&self) -> &str {
139 &self.catalog_name
140 }
141}
142
143#[derive(Clone)]
144pub struct CatalogManagerWrapper {
145 catalog_name: String,
146 catalog_manager: Weak<dyn CatalogManager>,
147}
148
149impl CatalogManagerWrapper {
150 fn catalog_manager(&self) -> std::result::Result<Arc<dyn CatalogManager>, DataFusionError> {
151 self.catalog_manager.upgrade().ok_or_else(|| {
152 DataFusionError::Internal("Failed to access catalog manager".to_string())
153 })
154 }
155}
156
157impl std::fmt::Debug for CatalogManagerWrapper {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 f.debug_struct("CatalogManagerWrapper").finish()
160 }
161}
162
163#[async_trait]
164impl CatalogInfo for CatalogManagerWrapper {
165 async fn catalog_names(&self) -> std::result::Result<Vec<String>, DataFusionError> {
166 if self.catalog_name == DEFAULT_CATALOG_NAME {
167 CatalogManager::catalog_names(self.catalog_manager()?.as_ref())
168 .await
169 .map_err(|e| DataFusionError::External(Box::new(e)))
170 } else {
171 Ok(vec![self.catalog_name.clone()])
172 }
173 }
174
175 async fn schema_names(
176 &self,
177 catalog_name: &str,
178 ) -> std::result::Result<Option<Vec<String>>, DataFusionError> {
179 self.catalog_manager()?
180 .schema_names(catalog_name, None)
181 .await
182 .map(Some)
183 .map_err(|e| DataFusionError::External(Box::new(e)))
184 }
185
186 async fn table_names(
187 &self,
188 catalog_name: &str,
189 schema_name: &str,
190 ) -> std::result::Result<Option<Vec<String>>, DataFusionError> {
191 self.catalog_manager()?
192 .table_names(catalog_name, schema_name, None)
193 .await
194 .map(Some)
195 .map_err(|e| DataFusionError::External(Box::new(e)))
196 }
197
198 async fn table_schema(
199 &self,
200 catalog_name: &str,
201 schema_name: &str,
202 table_name: &str,
203 ) -> std::result::Result<Option<SchemaRef>, DataFusionError> {
204 let table = self
205 .catalog_manager()?
206 .table(catalog_name, schema_name, table_name, None)
207 .await
208 .map_err(|e| DataFusionError::External(Box::new(e)))?;
209
210 Ok(table.map(|t| t.schema().arrow_schema().clone()))
211 }
212
213 async fn table_type(
214 &self,
215 catalog_name: &str,
216 schema_name: &str,
217 table_name: &str,
218 ) -> std::result::Result<Option<TableType>, DataFusionError> {
219 let table = self
220 .catalog_manager()?
221 .table(catalog_name, schema_name, table_name, None)
222 .await
223 .map_err(|e| DataFusionError::External(Box::new(e)))?;
224
225 Ok(table.map(|t| t.table_type().into()))
226 }
227}
228
229struct DFTableProviderAsSystemTable {
230 pub table_id: TableId,
231 pub table_name: &'static str,
232 pub table_type: table::metadata::TableType,
233 pub schema: Arc<datatypes::schema::Schema>,
234 pub table_provider: PgCatalogTable,
235}
236
237impl DFTableProviderAsSystemTable {
238 pub fn try_new(
239 table_id: TableId,
240 table_name: &'static str,
241 table_type: table::metadata::TableType,
242 table_provider: PgCatalogTable,
243 ) -> Result<Self> {
244 let arrow_schema = table_provider.schema();
245 let schema = Arc::new(arrow_schema.try_into().context(ProjectSchemaSnafu)?);
246 Ok(Self {
247 table_id,
248 table_name,
249 table_type,
250 schema,
251 table_provider,
252 })
253 }
254}
255
256impl SystemTable for DFTableProviderAsSystemTable {
257 fn table_id(&self) -> TableId {
258 self.table_id
259 }
260
261 fn table_name(&self) -> &'static str {
262 self.table_name
263 }
264
265 fn schema(&self) -> Arc<datatypes::schema::Schema> {
266 self.schema.clone()
267 }
268
269 fn table_type(&self) -> table::metadata::TableType {
270 self.table_type
271 }
272
273 fn to_stream(&self, _request: ScanRequest) -> Result<SendableRecordBatchStream> {
274 match &self.table_provider {
275 PgCatalogTable::Static(table) => {
276 let schema = self.schema.arrow_schema().clone();
277 let data = table
278 .data()
279 .iter()
280 .map(|rb| Ok(rb.clone()))
281 .collect::<Vec<_>>();
282 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
283 schema,
284 futures::stream::iter(data),
285 ));
286 Ok(Box::pin(
287 RecordBatchStreamAdapter::try_new(stream)
288 .map_err(BoxedError::new)
289 .context(InternalSnafu)?,
290 ))
291 }
292
293 PgCatalogTable::Dynamic(table) => {
294 let stream = table.execute(Arc::new(TaskContext::default()));
295 Ok(Box::pin(
296 RecordBatchStreamAdapter::try_new(stream)
297 .map_err(BoxedError::new)
298 .context(InternalSnafu)?,
299 ))
300 }
301
302 PgCatalogTable::Empty(_) => {
303 let schema = self.schema.arrow_schema().clone();
304 let stream = Box::pin(DfRecordBatchStreamAdapter::new(
305 schema,
306 futures::stream::iter(vec![]),
307 ));
308 Ok(Box::pin(
309 RecordBatchStreamAdapter::try_new(stream)
310 .map_err(BoxedError::new)
311 .context(InternalSnafu)?,
312 ))
313 }
314 }
315 }
316}