Skip to main content

catalog/table_source/
dummy_catalog.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Dummy catalog for region server.
16
17use std::any::Any;
18use std::fmt;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use common_catalog::format_full_table_name;
23use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider};
24use datafusion::datasource::TableProvider;
25use session::context::QueryContextRef;
26use snafu::OptionExt;
27use table::table::adapter::DfTableProviderAdapter;
28
29use crate::CatalogManagerRef;
30use crate::error::TableNotExistSnafu;
31
32/// Delegate the resolving requests to the `[CatalogManager]` unconditionally.
33#[derive(Clone)]
34pub struct DummyCatalogList {
35    catalog_manager: CatalogManagerRef,
36    query_ctx: Option<QueryContextRef>,
37}
38
39impl DummyCatalogList {
40    /// Creates a new catalog list with the given catalog manager (no query context).
41    pub fn new(catalog_manager: CatalogManagerRef) -> Self {
42        Self {
43            catalog_manager,
44            query_ctx: None,
45        }
46    }
47
48    /// Creates a new catalog list with the given catalog manager and query context.
49    pub fn new_with_query_ctx(
50        catalog_manager: CatalogManagerRef,
51        query_ctx: QueryContextRef,
52    ) -> Self {
53        Self {
54            catalog_manager,
55            query_ctx: Some(query_ctx),
56        }
57    }
58}
59
60impl fmt::Debug for DummyCatalogList {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        f.debug_struct("DummyCatalogList").finish()
63    }
64}
65
66impl CatalogProviderList for DummyCatalogList {
67    fn as_any(&self) -> &dyn Any {
68        self
69    }
70
71    fn register_catalog(
72        &self,
73        _name: String,
74        _catalog: Arc<dyn CatalogProvider>,
75    ) -> Option<Arc<dyn CatalogProvider>> {
76        None
77    }
78
79    fn catalog_names(&self) -> Vec<String> {
80        vec![]
81    }
82
83    fn catalog(&self, catalog_name: &str) -> Option<Arc<dyn CatalogProvider>> {
84        Some(Arc::new(DummyCatalogProvider {
85            catalog_name: catalog_name.to_string(),
86            catalog_manager: self.catalog_manager.clone(),
87            query_ctx: self.query_ctx.clone(),
88        }))
89    }
90}
91
92/// A dummy catalog provider for [DummyCatalogList].
93#[derive(Clone)]
94struct DummyCatalogProvider {
95    catalog_name: String,
96    catalog_manager: CatalogManagerRef,
97    query_ctx: Option<QueryContextRef>,
98}
99
100impl CatalogProvider for DummyCatalogProvider {
101    fn as_any(&self) -> &dyn Any {
102        self
103    }
104
105    fn schema_names(&self) -> Vec<String> {
106        vec![]
107    }
108
109    fn schema(&self, schema_name: &str) -> Option<Arc<dyn SchemaProvider>> {
110        Some(Arc::new(DummySchemaProvider {
111            catalog_name: self.catalog_name.clone(),
112            schema_name: schema_name.to_string(),
113            catalog_manager: self.catalog_manager.clone(),
114            query_ctx: self.query_ctx.clone(),
115        }))
116    }
117}
118
119impl fmt::Debug for DummyCatalogProvider {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        f.debug_struct("DummyCatalogProvider")
122            .field("catalog_name", &self.catalog_name)
123            .finish()
124    }
125}
126
127/// A dummy schema provider for [DummyCatalogList].
128#[derive(Clone)]
129struct DummySchemaProvider {
130    catalog_name: String,
131    schema_name: String,
132    catalog_manager: CatalogManagerRef,
133    query_ctx: Option<QueryContextRef>,
134}
135
136#[async_trait]
137impl SchemaProvider for DummySchemaProvider {
138    fn as_any(&self) -> &dyn Any {
139        self
140    }
141
142    fn table_names(&self) -> Vec<String> {
143        vec![]
144    }
145
146    async fn table(&self, name: &str) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
147        let table = self
148            .catalog_manager
149            .table(
150                &self.catalog_name,
151                &self.schema_name,
152                name,
153                self.query_ctx.as_deref(),
154            )
155            .await?
156            .with_context(|| TableNotExistSnafu {
157                table: format_full_table_name(&self.catalog_name, &self.schema_name, name),
158            })?;
159
160        let table_provider: Arc<dyn TableProvider> = Arc::new(DfTableProviderAdapter::new(table));
161
162        Ok(Some(table_provider))
163    }
164
165    fn table_exist(&self, _name: &str) -> bool {
166        true
167    }
168}
169
170impl fmt::Debug for DummySchemaProvider {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        f.debug_struct("DummySchemaProvider")
173            .field("catalog_name", &self.catalog_name)
174            .field("schema_name", &self.schema_name)
175            .finish()
176    }
177}