catalog/table_source/
dummy_catalog.rs1use std::any::Any;
18use std::fmt;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use common_catalog::format_full_table_name;
23use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider};
24use datafusion::datasource::TableProvider;
25use session::context::QueryContextRef;
26use snafu::OptionExt;
27use table::table::adapter::DfTableProviderAdapter;
28
29use crate::CatalogManagerRef;
30use crate::error::TableNotExistSnafu;
31
32#[derive(Clone)]
34pub struct DummyCatalogList {
35 catalog_manager: CatalogManagerRef,
36 query_ctx: Option<QueryContextRef>,
37}
38
39impl DummyCatalogList {
40 pub fn new(catalog_manager: CatalogManagerRef) -> Self {
42 Self {
43 catalog_manager,
44 query_ctx: None,
45 }
46 }
47
48 pub fn new_with_query_ctx(
50 catalog_manager: CatalogManagerRef,
51 query_ctx: QueryContextRef,
52 ) -> Self {
53 Self {
54 catalog_manager,
55 query_ctx: Some(query_ctx),
56 }
57 }
58}
59
60impl fmt::Debug for DummyCatalogList {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 f.debug_struct("DummyCatalogList").finish()
63 }
64}
65
66impl CatalogProviderList for DummyCatalogList {
67 fn as_any(&self) -> &dyn Any {
68 self
69 }
70
71 fn register_catalog(
72 &self,
73 _name: String,
74 _catalog: Arc<dyn CatalogProvider>,
75 ) -> Option<Arc<dyn CatalogProvider>> {
76 None
77 }
78
79 fn catalog_names(&self) -> Vec<String> {
80 vec![]
81 }
82
83 fn catalog(&self, catalog_name: &str) -> Option<Arc<dyn CatalogProvider>> {
84 Some(Arc::new(DummyCatalogProvider {
85 catalog_name: catalog_name.to_string(),
86 catalog_manager: self.catalog_manager.clone(),
87 query_ctx: self.query_ctx.clone(),
88 }))
89 }
90}
91
92#[derive(Clone)]
94struct DummyCatalogProvider {
95 catalog_name: String,
96 catalog_manager: CatalogManagerRef,
97 query_ctx: Option<QueryContextRef>,
98}
99
100impl CatalogProvider for DummyCatalogProvider {
101 fn as_any(&self) -> &dyn Any {
102 self
103 }
104
105 fn schema_names(&self) -> Vec<String> {
106 vec![]
107 }
108
109 fn schema(&self, schema_name: &str) -> Option<Arc<dyn SchemaProvider>> {
110 Some(Arc::new(DummySchemaProvider {
111 catalog_name: self.catalog_name.clone(),
112 schema_name: schema_name.to_string(),
113 catalog_manager: self.catalog_manager.clone(),
114 query_ctx: self.query_ctx.clone(),
115 }))
116 }
117}
118
119impl fmt::Debug for DummyCatalogProvider {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 f.debug_struct("DummyCatalogProvider")
122 .field("catalog_name", &self.catalog_name)
123 .finish()
124 }
125}
126
127#[derive(Clone)]
129struct DummySchemaProvider {
130 catalog_name: String,
131 schema_name: String,
132 catalog_manager: CatalogManagerRef,
133 query_ctx: Option<QueryContextRef>,
134}
135
136#[async_trait]
137impl SchemaProvider for DummySchemaProvider {
138 fn as_any(&self) -> &dyn Any {
139 self
140 }
141
142 fn table_names(&self) -> Vec<String> {
143 vec![]
144 }
145
146 async fn table(&self, name: &str) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
147 let table = self
148 .catalog_manager
149 .table(
150 &self.catalog_name,
151 &self.schema_name,
152 name,
153 self.query_ctx.as_deref(),
154 )
155 .await?
156 .with_context(|| TableNotExistSnafu {
157 table: format_full_table_name(&self.catalog_name, &self.schema_name, name),
158 })?;
159
160 let table_provider: Arc<dyn TableProvider> = Arc::new(DfTableProviderAdapter::new(table));
161
162 Ok(Some(table_provider))
163 }
164
165 fn table_exist(&self, _name: &str) -> bool {
166 true
167 }
168}
169
170impl fmt::Debug for DummySchemaProvider {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 f.debug_struct("DummySchemaProvider")
173 .field("catalog_name", &self.catalog_name)
174 .field("schema_name", &self.schema_name)
175 .finish()
176 }
177}