catalog/
table_source.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use common_catalog::format_full_table_name;
20use common_query::logical_plan::{rename_logical_plan_columns, SubstraitPlanDecoderRef};
21use datafusion::common::{ResolvedTableReference, TableReference};
22use datafusion::datasource::view::ViewTable;
23use datafusion::datasource::{provider_as_source, TableProvider};
24use datafusion::logical_expr::TableSource;
25use itertools::Itertools;
26use session::context::QueryContextRef;
27use snafu::{ensure, OptionExt, ResultExt};
28use table::metadata::TableType;
29use table::table::adapter::DfTableProviderAdapter;
30pub mod dummy_catalog;
31use dummy_catalog::DummyCatalogList;
32use table::TableRef;
33
34use crate::error::{
35    CastManagerSnafu, DecodePlanSnafu, GetViewCacheSnafu, ProjectViewColumnsSnafu,
36    QueryAccessDeniedSnafu, Result, TableNotExistSnafu, ViewInfoNotFoundSnafu,
37    ViewPlanColumnsChangedSnafu,
38};
39use crate::kvbackend::KvBackendCatalogManager;
40use crate::CatalogManagerRef;
41
42pub struct DfTableSourceProvider {
43    catalog_manager: CatalogManagerRef,
44    resolved_tables: HashMap<String, Arc<dyn TableSource>>,
45    disallow_cross_catalog_query: bool,
46    default_catalog: String,
47    default_schema: String,
48    query_ctx: QueryContextRef,
49    plan_decoder: SubstraitPlanDecoderRef,
50    enable_ident_normalization: bool,
51}
52
53impl DfTableSourceProvider {
54    pub fn new(
55        catalog_manager: CatalogManagerRef,
56        disallow_cross_catalog_query: bool,
57        query_ctx: QueryContextRef,
58        plan_decoder: SubstraitPlanDecoderRef,
59        enable_ident_normalization: bool,
60    ) -> Self {
61        Self {
62            catalog_manager,
63            disallow_cross_catalog_query,
64            resolved_tables: HashMap::new(),
65            default_catalog: query_ctx.current_catalog().to_owned(),
66            default_schema: query_ctx.current_schema(),
67            query_ctx,
68            plan_decoder,
69            enable_ident_normalization,
70        }
71    }
72
73    /// Returns the query context.
74    pub fn query_ctx(&self) -> &QueryContextRef {
75        &self.query_ctx
76    }
77
78    pub fn resolve_table_ref(&self, table_ref: TableReference) -> Result<ResolvedTableReference> {
79        if self.disallow_cross_catalog_query {
80            match &table_ref {
81                TableReference::Bare { .. } | TableReference::Partial { .. } => {}
82                TableReference::Full {
83                    catalog, schema, ..
84                } => {
85                    ensure!(
86                        catalog.as_ref() == self.default_catalog,
87                        QueryAccessDeniedSnafu {
88                            catalog: catalog.as_ref(),
89                            schema: schema.as_ref(),
90                        }
91                    );
92                }
93            };
94        }
95
96        Ok(table_ref.resolve(&self.default_catalog, &self.default_schema))
97    }
98
99    pub async fn resolve_table(
100        &mut self,
101        table_ref: TableReference,
102    ) -> Result<Arc<dyn TableSource>> {
103        let table_ref = self.resolve_table_ref(table_ref)?;
104
105        let resolved_name = table_ref.to_string();
106        if let Some(table) = self.resolved_tables.get(&resolved_name) {
107            return Ok(table.clone());
108        }
109
110        let catalog_name = table_ref.catalog.as_ref();
111        let schema_name = table_ref.schema.as_ref();
112        let table_name = table_ref.table.as_ref();
113
114        let table = self
115            .catalog_manager
116            .table(catalog_name, schema_name, table_name, Some(&self.query_ctx))
117            .await?
118            .with_context(|| TableNotExistSnafu {
119                table: format_full_table_name(catalog_name, schema_name, table_name),
120            })?;
121
122        let provider: Arc<dyn TableProvider> = if table.table_info().table_type == TableType::View {
123            self.create_view_provider(&table).await?
124        } else {
125            Arc::new(DfTableProviderAdapter::new(table))
126        };
127
128        let source = provider_as_source(provider);
129
130        let _ = self.resolved_tables.insert(resolved_name, source.clone());
131        Ok(source)
132    }
133
134    async fn create_view_provider(&self, table: &TableRef) -> Result<Arc<dyn TableProvider>> {
135        let catalog_manager = self
136            .catalog_manager
137            .as_any()
138            .downcast_ref::<KvBackendCatalogManager>()
139            .context(CastManagerSnafu)?;
140
141        let view_info = catalog_manager
142            .view_info_cache()?
143            .get(table.table_info().ident.table_id)
144            .await
145            .context(GetViewCacheSnafu)?
146            .context(ViewInfoNotFoundSnafu {
147                name: &table.table_info().name,
148            })?;
149
150        // Build the catalog list provider for deserialization.
151        let catalog_list = Arc::new(DummyCatalogList::new(self.catalog_manager.clone()));
152        let logical_plan = self
153            .plan_decoder
154            .decode(Bytes::from(view_info.view_info.clone()), catalog_list, true)
155            .await
156            .context(DecodePlanSnafu {
157                name: &table.table_info().name,
158            })?;
159
160        let columns: Vec<_> = view_info.columns.iter().map(|c| c.as_str()).collect();
161
162        let original_plan_columns: Vec<_> =
163            view_info.plan_columns.iter().map(|c| c.as_str()).collect();
164
165        let plan_columns: Vec<_> = logical_plan
166            .schema()
167            .columns()
168            .into_iter()
169            .map(|c| c.name)
170            .collect();
171
172        // Only check columns number, because substrait doesn't include aliases currently.
173        // See https://github.com/apache/datafusion/issues/10815#issuecomment-2158666881
174        // and https://github.com/apache/datafusion/issues/6489
175        // TODO(dennis): check column names
176        ensure!(
177            original_plan_columns.len() == plan_columns.len(),
178            ViewPlanColumnsChangedSnafu {
179                origin_names: original_plan_columns.iter().join(","),
180                actual_names: plan_columns.iter().join(","),
181            }
182        );
183
184        // We have to do `columns` projection here, because
185        // substrait doesn't include aliases neither for tables nor for columns:
186        // https://github.com/apache/datafusion/issues/10815#issuecomment-2158666881
187        let logical_plan = if !columns.is_empty() {
188            rename_logical_plan_columns(
189                self.enable_ident_normalization,
190                logical_plan,
191                plan_columns
192                    .iter()
193                    .map(|c| c.as_str())
194                    .zip(columns.into_iter())
195                    .collect(),
196            )
197            .context(ProjectViewColumnsSnafu)?
198        } else {
199            logical_plan
200        };
201
202        Ok(Arc::new(ViewTable::new(
203            logical_plan,
204            Some(view_info.definition.to_string()),
205        )))
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use common_query::test_util::DummyDecoder;
212    use session::context::QueryContext;
213
214    use super::*;
215    use crate::kvbackend::KvBackendCatalogManagerBuilder;
216    use crate::memory::MemoryCatalogManager;
217
218    #[test]
219    fn test_validate_table_ref() {
220        let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
221
222        let table_provider = DfTableSourceProvider::new(
223            MemoryCatalogManager::with_default_setup(),
224            true,
225            query_ctx.clone(),
226            DummyDecoder::arc(),
227            true,
228        );
229
230        let table_ref = TableReference::bare("table_name");
231        let result = table_provider.resolve_table_ref(table_ref);
232        assert!(result.is_ok());
233
234        let table_ref = TableReference::partial("public", "table_name");
235        let result = table_provider.resolve_table_ref(table_ref);
236        assert!(result.is_ok());
237
238        let table_ref = TableReference::partial("wrong_schema", "table_name");
239        let result = table_provider.resolve_table_ref(table_ref);
240        assert!(result.is_ok());
241
242        let table_ref = TableReference::full("greptime", "public", "table_name");
243        let result = table_provider.resolve_table_ref(table_ref);
244        assert!(result.is_ok());
245
246        let table_ref = TableReference::full("wrong_catalog", "public", "table_name");
247        let result = table_provider.resolve_table_ref(table_ref);
248        assert!(result.is_err());
249
250        let table_ref = TableReference::partial("information_schema", "columns");
251        let result = table_provider.resolve_table_ref(table_ref);
252        assert!(result.is_ok());
253
254        let table_ref = TableReference::full("greptime", "information_schema", "columns");
255        assert!(table_provider.resolve_table_ref(table_ref).is_ok());
256
257        let table_ref = TableReference::full("dummy", "information_schema", "columns");
258        assert!(table_provider.resolve_table_ref(table_ref).is_err());
259
260        let table_ref = TableReference::full("greptime", "greptime_private", "columns");
261        assert!(table_provider.resolve_table_ref(table_ref).is_ok());
262    }
263
264    use std::collections::HashSet;
265
266    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
267    use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
268    use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
269    use common_meta::key::TableMetadataManager;
270    use common_meta::kv_backend::memory::MemoryKvBackend;
271    use common_query::error::Result as QueryResult;
272    use common_query::logical_plan::SubstraitPlanDecoder;
273    use datafusion::catalog::CatalogProviderList;
274    use datafusion::logical_expr::builder::LogicalTableSource;
275    use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder};
276
277    use crate::information_schema::NoopInformationExtension;
278
279    struct MockDecoder;
280    impl MockDecoder {
281        pub fn arc() -> Arc<Self> {
282            Arc::new(MockDecoder)
283        }
284    }
285
286    #[async_trait::async_trait]
287    impl SubstraitPlanDecoder for MockDecoder {
288        async fn decode(
289            &self,
290            _message: bytes::Bytes,
291            _catalog_list: Arc<dyn CatalogProviderList>,
292            _optimize: bool,
293        ) -> QueryResult<LogicalPlan> {
294            Ok(mock_plan())
295        }
296    }
297
298    fn mock_plan() -> LogicalPlan {
299        let schema = Schema::new(vec![
300            Field::new("id", DataType::Int32, true),
301            Field::new("name", DataType::Utf8, true),
302        ]);
303        let table_source = LogicalTableSource::new(SchemaRef::new(schema));
304
305        let projection = None;
306
307        let builder =
308            LogicalPlanBuilder::scan("person", Arc::new(table_source), projection).unwrap();
309
310        builder
311            .filter(col("id").gt(lit(500)))
312            .unwrap()
313            .build()
314            .unwrap()
315    }
316
317    #[tokio::test]
318    async fn test_resolve_view() {
319        let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
320        let backend = Arc::new(MemoryKvBackend::default());
321        let layered_cache_builder = LayeredCacheRegistryBuilder::default()
322            .add_cache_registry(CacheRegistryBuilder::default().build());
323        let fundamental_cache_registry = build_fundamental_cache_registry(backend.clone());
324        let layered_cache_registry = Arc::new(
325            with_default_composite_cache_registry(
326                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
327            )
328            .unwrap()
329            .build(),
330        );
331
332        let catalog_manager = KvBackendCatalogManagerBuilder::new(
333            Arc::new(NoopInformationExtension),
334            backend.clone(),
335            layered_cache_registry,
336        )
337        .build();
338
339        let table_metadata_manager = TableMetadataManager::new(backend);
340        let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);
341        view_info.table_type = TableType::View;
342        let logical_plan = vec![1, 2, 3];
343        // Create view metadata
344        table_metadata_manager
345            .create_view_metadata(
346                view_info.clone().into(),
347                logical_plan,
348                HashSet::new(),
349                vec!["a".to_string(), "b".to_string()],
350                vec!["id".to_string(), "name".to_string()],
351                "definition".to_string(),
352            )
353            .await
354            .unwrap();
355
356        let mut table_provider = DfTableSourceProvider::new(
357            catalog_manager,
358            true,
359            query_ctx.clone(),
360            MockDecoder::arc(),
361            true,
362        );
363
364        // View not found
365        let table_ref = TableReference::bare("not_exists_view");
366        assert!(table_provider.resolve_table(table_ref).await.is_err());
367
368        let table_ref = TableReference::bare(view_info.name);
369        let source = table_provider.resolve_table(table_ref).await.unwrap();
370        assert_eq!(
371            r#"
372Projection: person.id AS a, person.name AS b
373  Filter: person.id > Int32(500)
374    TableScan: person"#,
375            format!("\n{}", source.get_logical_plan().unwrap())
376        );
377    }
378}