catalog/table_source/
dummy_catalog.rs1use std::any::Any;
18use std::fmt;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use common_catalog::format_full_table_name;
23use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider};
24use datafusion::datasource::TableProvider;
25use snafu::OptionExt;
26use table::table::adapter::DfTableProviderAdapter;
27
28use crate::error::TableNotExistSnafu;
29use crate::CatalogManagerRef;
30
31#[derive(Clone)]
33pub struct DummyCatalogList {
34 catalog_manager: CatalogManagerRef,
35}
36
37impl DummyCatalogList {
38 pub fn new(catalog_manager: CatalogManagerRef) -> Self {
40 Self { catalog_manager }
41 }
42}
43
44impl fmt::Debug for DummyCatalogList {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 f.debug_struct("DummyCatalogList").finish()
47 }
48}
49
50impl CatalogProviderList for DummyCatalogList {
51 fn as_any(&self) -> &dyn Any {
52 self
53 }
54
55 fn register_catalog(
56 &self,
57 _name: String,
58 _catalog: Arc<dyn CatalogProvider>,
59 ) -> Option<Arc<dyn CatalogProvider>> {
60 None
61 }
62
63 fn catalog_names(&self) -> Vec<String> {
64 vec![]
65 }
66
67 fn catalog(&self, catalog_name: &str) -> Option<Arc<dyn CatalogProvider>> {
68 Some(Arc::new(DummyCatalogProvider {
69 catalog_name: catalog_name.to_string(),
70 catalog_manager: self.catalog_manager.clone(),
71 }))
72 }
73}
74
75#[derive(Clone)]
77struct DummyCatalogProvider {
78 catalog_name: String,
79 catalog_manager: CatalogManagerRef,
80}
81
82impl CatalogProvider for DummyCatalogProvider {
83 fn as_any(&self) -> &dyn Any {
84 self
85 }
86
87 fn schema_names(&self) -> Vec<String> {
88 vec![]
89 }
90
91 fn schema(&self, schema_name: &str) -> Option<Arc<dyn SchemaProvider>> {
92 Some(Arc::new(DummySchemaProvider {
93 catalog_name: self.catalog_name.clone(),
94 schema_name: schema_name.to_string(),
95 catalog_manager: self.catalog_manager.clone(),
96 }))
97 }
98}
99
100impl fmt::Debug for DummyCatalogProvider {
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 f.debug_struct("DummyCatalogProvider")
103 .field("catalog_name", &self.catalog_name)
104 .finish()
105 }
106}
107
108#[derive(Clone)]
110struct DummySchemaProvider {
111 catalog_name: String,
112 schema_name: String,
113 catalog_manager: CatalogManagerRef,
114}
115
116#[async_trait]
117impl SchemaProvider for DummySchemaProvider {
118 fn as_any(&self) -> &dyn Any {
119 self
120 }
121
122 fn table_names(&self) -> Vec<String> {
123 vec![]
124 }
125
126 async fn table(&self, name: &str) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
127 let table = self
128 .catalog_manager
129 .table(&self.catalog_name, &self.schema_name, name, None)
130 .await?
131 .with_context(|| TableNotExistSnafu {
132 table: format_full_table_name(&self.catalog_name, &self.schema_name, name),
133 })?;
134
135 let table_provider: Arc<dyn TableProvider> = Arc::new(DfTableProviderAdapter::new(table));
136
137 Ok(Some(table_provider))
138 }
139
140 fn table_exist(&self, _name: &str) -> bool {
141 true
142 }
143}
144
145impl fmt::Debug for DummySchemaProvider {
146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 f.debug_struct("DummySchemaProvider")
148 .field("catalog_name", &self.catalog_name)
149 .field("schema_name", &self.schema_name)
150 .finish()
151 }
152}