Skip to main content

catalog/
table_source.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_catalog::format_full_table_name;
19use common_query::logical_plan::{SubstraitPlanDecoderRef, rename_logical_plan_columns};
20use datafusion::common::{ResolvedTableReference, TableReference};
21use datafusion::datasource::view::ViewTable;
22use datafusion::datasource::{TableProvider, provider_as_source};
23use datafusion::logical_expr::TableSource;
24use itertools::Itertools;
25use session::context::QueryContextRef;
26use snafu::{OptionExt, ResultExt, ensure};
27use table::metadata::TableType;
28use table::table::adapter::DfTableProviderAdapter;
29pub mod dummy_catalog;
30use dummy_catalog::DummyCatalogList;
31use table::TableRef;
32
33use crate::CatalogManagerRef;
34use crate::error::{
35    CastManagerSnafu, DecodePlanSnafu, GetViewCacheSnafu, ProjectViewColumnsSnafu,
36    QueryAccessDeniedSnafu, Result, TableNotExistSnafu, ViewInfoNotFoundSnafu,
37    ViewPlanColumnsChangedSnafu,
38};
39use crate::kvbackend::KvBackendCatalogManager;
40
41pub struct DfTableSourceProvider {
42    catalog_manager: CatalogManagerRef,
43    resolved_tables: HashMap<String, Arc<dyn TableSource>>,
44    disallow_cross_catalog_query: bool,
45    default_catalog: String,
46    default_schema: String,
47    query_ctx: QueryContextRef,
48    plan_decoder: SubstraitPlanDecoderRef,
49    enable_ident_normalization: bool,
50}
51
52impl DfTableSourceProvider {
53    pub fn new(
54        catalog_manager: CatalogManagerRef,
55        disallow_cross_catalog_query: bool,
56        query_ctx: QueryContextRef,
57        plan_decoder: SubstraitPlanDecoderRef,
58        enable_ident_normalization: bool,
59    ) -> Self {
60        Self {
61            catalog_manager,
62            disallow_cross_catalog_query,
63            resolved_tables: HashMap::new(),
64            default_catalog: query_ctx.current_catalog().to_owned(),
65            default_schema: query_ctx.current_schema(),
66            query_ctx,
67            plan_decoder,
68            enable_ident_normalization,
69        }
70    }
71
72    /// Returns the query context.
73    pub fn query_ctx(&self) -> &QueryContextRef {
74        &self.query_ctx
75    }
76
77    pub fn resolve_table_ref(&self, table_ref: TableReference) -> Result<ResolvedTableReference> {
78        if self.disallow_cross_catalog_query {
79            match &table_ref {
80                TableReference::Bare { .. } | TableReference::Partial { .. } => {}
81                TableReference::Full {
82                    catalog, schema, ..
83                } => {
84                    ensure!(
85                        catalog.as_ref() == self.default_catalog,
86                        QueryAccessDeniedSnafu {
87                            catalog: catalog.as_ref(),
88                            schema: schema.as_ref(),
89                        }
90                    );
91                }
92            };
93        }
94
95        Ok(table_ref.resolve(&self.default_catalog, &self.default_schema))
96    }
97
98    pub async fn resolve_table(
99        &mut self,
100        table_ref: TableReference,
101    ) -> Result<Arc<dyn TableSource>> {
102        let table_ref = self.resolve_table_ref(table_ref)?;
103
104        let resolved_name = table_ref.to_string();
105        if let Some(table) = self.resolved_tables.get(&resolved_name) {
106            return Ok(table.clone());
107        }
108
109        let catalog_name = table_ref.catalog.as_ref();
110        let schema_name = table_ref.schema.as_ref();
111        let table_name = table_ref.table.as_ref();
112
113        let table = self
114            .catalog_manager
115            .table(catalog_name, schema_name, table_name, Some(&self.query_ctx))
116            .await?
117            .with_context(|| TableNotExistSnafu {
118                table: format_full_table_name(catalog_name, schema_name, table_name),
119            })?;
120
121        let provider: Arc<dyn TableProvider> = if table.table_info().table_type == TableType::View {
122            self.create_view_provider(&table).await?
123        } else {
124            Arc::new(DfTableProviderAdapter::new(table))
125        };
126
127        let source = provider_as_source(provider);
128
129        let _ = self.resolved_tables.insert(resolved_name, source.clone());
130        Ok(source)
131    }
132
133    async fn create_view_provider(&self, table: &TableRef) -> Result<Arc<dyn TableProvider>> {
134        let catalog_manager = self
135            .catalog_manager
136            .as_any()
137            .downcast_ref::<KvBackendCatalogManager>()
138            .context(CastManagerSnafu)?;
139
140        let view_info = catalog_manager
141            .view_info_cache()?
142            .get(table.table_info().ident.table_id)
143            .await
144            .context(GetViewCacheSnafu)?
145            .context(ViewInfoNotFoundSnafu {
146                name: &table.table_info().name,
147            })?;
148
149        // Build the catalog list provider for deserialization.
150        let catalog_list = Arc::new(DummyCatalogList::new(self.catalog_manager.clone()));
151        let logical_plan = self
152            .plan_decoder
153            .decode(view_info.view_info.clone().into(), catalog_list, false)
154            .await
155            .context(DecodePlanSnafu {
156                name: &table.table_info().name,
157            })?;
158
159        let columns: Vec<_> = view_info.columns.iter().map(|c| c.as_str()).collect();
160
161        let original_plan_columns: Vec<_> =
162            view_info.plan_columns.iter().map(|c| c.as_str()).collect();
163
164        let plan_columns: Vec<_> = logical_plan
165            .schema()
166            .columns()
167            .into_iter()
168            .map(|c| c.name)
169            .collect();
170
171        // Only check columns number, because substrait doesn't include aliases currently.
172        // See https://github.com/apache/datafusion/issues/10815#issuecomment-2158666881
173        // and https://github.com/apache/datafusion/issues/6489
174        // TODO(dennis): check column names
175        ensure!(
176            original_plan_columns.len() == plan_columns.len(),
177            ViewPlanColumnsChangedSnafu {
178                origin_names: original_plan_columns.iter().join(","),
179                actual_names: plan_columns.iter().join(","),
180            }
181        );
182
183        // We have to do `columns` projection here, because
184        // substrait doesn't include aliases neither for tables nor for columns:
185        // https://github.com/apache/datafusion/issues/10815#issuecomment-2158666881
186        let logical_plan = if !columns.is_empty() {
187            rename_logical_plan_columns(
188                self.enable_ident_normalization,
189                logical_plan,
190                plan_columns
191                    .iter()
192                    .map(|c| c.as_str())
193                    .zip(columns)
194                    .collect(),
195            )
196            .context(ProjectViewColumnsSnafu)?
197        } else {
198            logical_plan
199        };
200
201        Ok(Arc::new(ViewTable::new(
202            logical_plan,
203            Some(view_info.definition.clone()),
204        )))
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use common_query::test_util::DummyDecoder;
211    use session::context::QueryContext;
212
213    use super::*;
214    use crate::kvbackend::KvBackendCatalogManagerBuilder;
215    use crate::memory::MemoryCatalogManager;
216
217    #[test]
218    fn test_validate_table_ref() {
219        let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
220
221        let table_provider = DfTableSourceProvider::new(
222            MemoryCatalogManager::with_default_setup(),
223            true,
224            query_ctx.clone(),
225            DummyDecoder::arc(),
226            true,
227        );
228
229        let table_ref = TableReference::bare("table_name");
230        let result = table_provider.resolve_table_ref(table_ref);
231        assert!(result.is_ok());
232
233        let table_ref = TableReference::partial("public", "table_name");
234        let result = table_provider.resolve_table_ref(table_ref);
235        assert!(result.is_ok());
236
237        let table_ref = TableReference::partial("wrong_schema", "table_name");
238        let result = table_provider.resolve_table_ref(table_ref);
239        assert!(result.is_ok());
240
241        let table_ref = TableReference::full("greptime", "public", "table_name");
242        let result = table_provider.resolve_table_ref(table_ref);
243        assert!(result.is_ok());
244
245        let table_ref = TableReference::full("wrong_catalog", "public", "table_name");
246        let result = table_provider.resolve_table_ref(table_ref);
247        assert!(result.is_err());
248
249        let table_ref = TableReference::partial("information_schema", "columns");
250        let result = table_provider.resolve_table_ref(table_ref);
251        assert!(result.is_ok());
252
253        let table_ref = TableReference::full("greptime", "information_schema", "columns");
254        assert!(table_provider.resolve_table_ref(table_ref).is_ok());
255
256        let table_ref = TableReference::full("dummy", "information_schema", "columns");
257        assert!(table_provider.resolve_table_ref(table_ref).is_err());
258
259        let table_ref = TableReference::full("greptime", "greptime_private", "columns");
260        assert!(table_provider.resolve_table_ref(table_ref).is_ok());
261    }
262
263    use std::collections::HashSet;
264
265    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
266    use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
267    use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
268    use common_meta::key::TableMetadataManager;
269    use common_meta::kv_backend::memory::MemoryKvBackend;
270    use common_query::error::Result as QueryResult;
271    use common_query::logical_plan::SubstraitPlanDecoder;
272    use datafusion::catalog::CatalogProviderList;
273    use datafusion::logical_expr::builder::LogicalTableSource;
274    use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, col, lit};
275
276    use crate::information_schema::NoopInformationExtension;
277
278    struct MockDecoder;
279    impl MockDecoder {
280        pub fn arc() -> Arc<Self> {
281            Arc::new(MockDecoder)
282        }
283    }
284
285    #[async_trait::async_trait]
286    impl SubstraitPlanDecoder for MockDecoder {
287        async fn decode(
288            &self,
289            _message: bytes::Bytes,
290            _catalog_list: Arc<dyn CatalogProviderList>,
291            _optimize: bool,
292        ) -> QueryResult<LogicalPlan> {
293            Ok(mock_plan())
294        }
295    }
296
297    fn mock_plan() -> LogicalPlan {
298        let schema = Schema::new(vec![
299            Field::new("id", DataType::Int32, true),
300            Field::new("name", DataType::Utf8, true),
301        ]);
302        let table_source = LogicalTableSource::new(SchemaRef::new(schema));
303
304        let projection = None;
305
306        let builder =
307            LogicalPlanBuilder::scan("person", Arc::new(table_source), projection).unwrap();
308
309        builder
310            .filter(col("id").gt(lit(500)))
311            .unwrap()
312            .build()
313            .unwrap()
314    }
315
316    #[tokio::test]
317    async fn test_resolve_view() {
318        let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
319        let backend = Arc::new(MemoryKvBackend::default());
320        let layered_cache_builder = LayeredCacheRegistryBuilder::default()
321            .add_cache_registry(CacheRegistryBuilder::default().build());
322        let fundamental_cache_registry = build_fundamental_cache_registry(backend.clone());
323        let layered_cache_registry = Arc::new(
324            with_default_composite_cache_registry(
325                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
326            )
327            .unwrap()
328            .build(),
329        );
330
331        let catalog_manager = KvBackendCatalogManagerBuilder::new(
332            Arc::new(NoopInformationExtension),
333            backend.clone(),
334            layered_cache_registry,
335        )
336        .build();
337
338        let table_metadata_manager = TableMetadataManager::new(backend);
339        let mut view_info = common_meta::key::test_utils::new_test_table_info(1024);
340        view_info.table_type = TableType::View;
341        let logical_plan = vec![1, 2, 3];
342        // Create view metadata
343        table_metadata_manager
344            .create_view_metadata(
345                view_info.clone(),
346                logical_plan,
347                HashSet::new(),
348                vec!["a".to_string(), "b".to_string()],
349                vec!["id".to_string(), "name".to_string()],
350                "definition".to_string(),
351            )
352            .await
353            .unwrap();
354
355        let mut table_provider = DfTableSourceProvider::new(
356            catalog_manager,
357            true,
358            query_ctx.clone(),
359            MockDecoder::arc(),
360            true,
361        );
362
363        // View not found
364        let table_ref = TableReference::bare("not_exists_view");
365        assert!(table_provider.resolve_table(table_ref).await.is_err());
366
367        let table_ref = TableReference::bare(view_info.name);
368        let source = table_provider.resolve_table(table_ref).await.unwrap();
369        assert_eq!(
370            r#"
371Projection: person.id AS a, person.name AS b
372  Filter: person.id > Int32(500)
373    TableScan: person"#,
374            format!("\n{}", source.get_logical_plan().unwrap())
375        );
376    }
377}