catalog/
table_source.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use common_catalog::format_full_table_name;
20use common_query::logical_plan::{rename_logical_plan_columns, SubstraitPlanDecoderRef};
21use datafusion::common::{ResolvedTableReference, TableReference};
22use datafusion::datasource::view::ViewTable;
23use datafusion::datasource::{provider_as_source, TableProvider};
24use datafusion::logical_expr::TableSource;
25use itertools::Itertools;
26use session::context::QueryContextRef;
27use snafu::{ensure, OptionExt, ResultExt};
28use table::metadata::TableType;
29use table::table::adapter::DfTableProviderAdapter;
30pub mod dummy_catalog;
31use dummy_catalog::DummyCatalogList;
32use table::TableRef;
33
34use crate::error::{
35    CastManagerSnafu, DatafusionSnafu, DecodePlanSnafu, GetViewCacheSnafu, ProjectViewColumnsSnafu,
36    QueryAccessDeniedSnafu, Result, TableNotExistSnafu, ViewInfoNotFoundSnafu,
37    ViewPlanColumnsChangedSnafu,
38};
39use crate::kvbackend::KvBackendCatalogManager;
40use crate::CatalogManagerRef;
41
42pub struct DfTableSourceProvider {
43    catalog_manager: CatalogManagerRef,
44    resolved_tables: HashMap<String, Arc<dyn TableSource>>,
45    disallow_cross_catalog_query: bool,
46    default_catalog: String,
47    default_schema: String,
48    query_ctx: QueryContextRef,
49    plan_decoder: SubstraitPlanDecoderRef,
50    enable_ident_normalization: bool,
51}
52
53impl DfTableSourceProvider {
54    pub fn new(
55        catalog_manager: CatalogManagerRef,
56        disallow_cross_catalog_query: bool,
57        query_ctx: QueryContextRef,
58        plan_decoder: SubstraitPlanDecoderRef,
59        enable_ident_normalization: bool,
60    ) -> Self {
61        Self {
62            catalog_manager,
63            disallow_cross_catalog_query,
64            resolved_tables: HashMap::new(),
65            default_catalog: query_ctx.current_catalog().to_owned(),
66            default_schema: query_ctx.current_schema(),
67            query_ctx,
68            plan_decoder,
69            enable_ident_normalization,
70        }
71    }
72
73    pub fn resolve_table_ref(&self, table_ref: TableReference) -> Result<ResolvedTableReference> {
74        if self.disallow_cross_catalog_query {
75            match &table_ref {
76                TableReference::Bare { .. } | TableReference::Partial { .. } => {}
77                TableReference::Full {
78                    catalog, schema, ..
79                } => {
80                    ensure!(
81                        catalog.as_ref() == self.default_catalog,
82                        QueryAccessDeniedSnafu {
83                            catalog: catalog.as_ref(),
84                            schema: schema.as_ref(),
85                        }
86                    );
87                }
88            };
89        }
90
91        Ok(table_ref.resolve(&self.default_catalog, &self.default_schema))
92    }
93
94    pub async fn resolve_table(
95        &mut self,
96        table_ref: TableReference,
97    ) -> Result<Arc<dyn TableSource>> {
98        let table_ref = self.resolve_table_ref(table_ref)?;
99
100        let resolved_name = table_ref.to_string();
101        if let Some(table) = self.resolved_tables.get(&resolved_name) {
102            return Ok(table.clone());
103        }
104
105        let catalog_name = table_ref.catalog.as_ref();
106        let schema_name = table_ref.schema.as_ref();
107        let table_name = table_ref.table.as_ref();
108
109        let table = self
110            .catalog_manager
111            .table(catalog_name, schema_name, table_name, Some(&self.query_ctx))
112            .await?
113            .with_context(|| TableNotExistSnafu {
114                table: format_full_table_name(catalog_name, schema_name, table_name),
115            })?;
116
117        let provider: Arc<dyn TableProvider> = if table.table_info().table_type == TableType::View {
118            self.create_view_provider(&table).await?
119        } else {
120            Arc::new(DfTableProviderAdapter::new(table))
121        };
122
123        let source = provider_as_source(provider);
124
125        let _ = self.resolved_tables.insert(resolved_name, source.clone());
126        Ok(source)
127    }
128
129    async fn create_view_provider(&self, table: &TableRef) -> Result<Arc<dyn TableProvider>> {
130        let catalog_manager = self
131            .catalog_manager
132            .as_any()
133            .downcast_ref::<KvBackendCatalogManager>()
134            .context(CastManagerSnafu)?;
135
136        let view_info = catalog_manager
137            .view_info_cache()?
138            .get(table.table_info().ident.table_id)
139            .await
140            .context(GetViewCacheSnafu)?
141            .context(ViewInfoNotFoundSnafu {
142                name: &table.table_info().name,
143            })?;
144
145        // Build the catalog list provider for deserialization.
146        let catalog_list = Arc::new(DummyCatalogList::new(self.catalog_manager.clone()));
147        let logical_plan = self
148            .plan_decoder
149            .decode(Bytes::from(view_info.view_info.clone()), catalog_list, true)
150            .await
151            .context(DecodePlanSnafu {
152                name: &table.table_info().name,
153            })?;
154
155        let columns: Vec<_> = view_info.columns.iter().map(|c| c.as_str()).collect();
156
157        let original_plan_columns: Vec<_> =
158            view_info.plan_columns.iter().map(|c| c.as_str()).collect();
159
160        let plan_columns: Vec<_> = logical_plan
161            .schema()
162            .columns()
163            .into_iter()
164            .map(|c| c.name)
165            .collect();
166
167        // Only check columns number, because substrait doesn't include aliases currently.
168        // See https://github.com/apache/datafusion/issues/10815#issuecomment-2158666881
169        // and https://github.com/apache/datafusion/issues/6489
170        // TODO(dennis): check column names
171        ensure!(
172            original_plan_columns.len() == plan_columns.len(),
173            ViewPlanColumnsChangedSnafu {
174                origin_names: original_plan_columns.iter().join(","),
175                actual_names: plan_columns.iter().join(","),
176            }
177        );
178
179        // We have to do `columns` projection here, because
180        // substrait doesn't include aliases neither for tables nor for columns:
181        // https://github.com/apache/datafusion/issues/10815#issuecomment-2158666881
182        let logical_plan = if !columns.is_empty() {
183            rename_logical_plan_columns(
184                self.enable_ident_normalization,
185                logical_plan,
186                plan_columns
187                    .iter()
188                    .map(|c| c.as_str())
189                    .zip(columns.into_iter())
190                    .collect(),
191            )
192            .context(ProjectViewColumnsSnafu)?
193        } else {
194            logical_plan
195        };
196
197        Ok(Arc::new(
198            ViewTable::try_new(logical_plan, Some(view_info.definition.to_string()))
199                .context(DatafusionSnafu)?,
200        ))
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use common_query::test_util::DummyDecoder;
207    use session::context::QueryContext;
208
209    use super::*;
210    use crate::memory::MemoryCatalogManager;
211
212    #[test]
213    fn test_validate_table_ref() {
214        let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
215
216        let table_provider = DfTableSourceProvider::new(
217            MemoryCatalogManager::with_default_setup(),
218            true,
219            query_ctx.clone(),
220            DummyDecoder::arc(),
221            true,
222        );
223
224        let table_ref = TableReference::bare("table_name");
225        let result = table_provider.resolve_table_ref(table_ref);
226        assert!(result.is_ok());
227
228        let table_ref = TableReference::partial("public", "table_name");
229        let result = table_provider.resolve_table_ref(table_ref);
230        assert!(result.is_ok());
231
232        let table_ref = TableReference::partial("wrong_schema", "table_name");
233        let result = table_provider.resolve_table_ref(table_ref);
234        assert!(result.is_ok());
235
236        let table_ref = TableReference::full("greptime", "public", "table_name");
237        let result = table_provider.resolve_table_ref(table_ref);
238        assert!(result.is_ok());
239
240        let table_ref = TableReference::full("wrong_catalog", "public", "table_name");
241        let result = table_provider.resolve_table_ref(table_ref);
242        assert!(result.is_err());
243
244        let table_ref = TableReference::partial("information_schema", "columns");
245        let result = table_provider.resolve_table_ref(table_ref);
246        assert!(result.is_ok());
247
248        let table_ref = TableReference::full("greptime", "information_schema", "columns");
249        assert!(table_provider.resolve_table_ref(table_ref).is_ok());
250
251        let table_ref = TableReference::full("dummy", "information_schema", "columns");
252        assert!(table_provider.resolve_table_ref(table_ref).is_err());
253
254        let table_ref = TableReference::full("greptime", "greptime_private", "columns");
255        assert!(table_provider.resolve_table_ref(table_ref).is_ok());
256    }
257
258    use std::collections::HashSet;
259
260    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
261    use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
262    use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
263    use common_meta::key::TableMetadataManager;
264    use common_meta::kv_backend::memory::MemoryKvBackend;
265    use common_query::error::Result as QueryResult;
266    use common_query::logical_plan::SubstraitPlanDecoder;
267    use datafusion::catalog::CatalogProviderList;
268    use datafusion::logical_expr::builder::LogicalTableSource;
269    use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder};
270
271    use crate::information_schema::NoopInformationExtension;
272
273    struct MockDecoder;
274    impl MockDecoder {
275        pub fn arc() -> Arc<Self> {
276            Arc::new(MockDecoder)
277        }
278    }
279
280    #[async_trait::async_trait]
281    impl SubstraitPlanDecoder for MockDecoder {
282        async fn decode(
283            &self,
284            _message: bytes::Bytes,
285            _catalog_list: Arc<dyn CatalogProviderList>,
286            _optimize: bool,
287        ) -> QueryResult<LogicalPlan> {
288            Ok(mock_plan())
289        }
290    }
291
292    fn mock_plan() -> LogicalPlan {
293        let schema = Schema::new(vec![
294            Field::new("id", DataType::Int32, true),
295            Field::new("name", DataType::Utf8, true),
296        ]);
297        let table_source = LogicalTableSource::new(SchemaRef::new(schema));
298
299        let projection = None;
300
301        let builder =
302            LogicalPlanBuilder::scan("person", Arc::new(table_source), projection).unwrap();
303
304        builder
305            .filter(col("id").gt(lit(500)))
306            .unwrap()
307            .build()
308            .unwrap()
309    }
310
311    #[tokio::test]
312    async fn test_resolve_view() {
313        let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
314        let backend = Arc::new(MemoryKvBackend::default());
315        let layered_cache_builder = LayeredCacheRegistryBuilder::default()
316            .add_cache_registry(CacheRegistryBuilder::default().build());
317        let fundamental_cache_registry = build_fundamental_cache_registry(backend.clone());
318        let layered_cache_registry = Arc::new(
319            with_default_composite_cache_registry(
320                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
321            )
322            .unwrap()
323            .build(),
324        );
325
326        let catalog_manager = KvBackendCatalogManager::new(
327            Arc::new(NoopInformationExtension),
328            backend.clone(),
329            layered_cache_registry,
330            None,
331        );
332        let table_metadata_manager = TableMetadataManager::new(backend);
333        let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);
334        view_info.table_type = TableType::View;
335        let logical_plan = vec![1, 2, 3];
336        // Create view metadata
337        table_metadata_manager
338            .create_view_metadata(
339                view_info.clone().into(),
340                logical_plan,
341                HashSet::new(),
342                vec!["a".to_string(), "b".to_string()],
343                vec!["id".to_string(), "name".to_string()],
344                "definition".to_string(),
345            )
346            .await
347            .unwrap();
348
349        let mut table_provider = DfTableSourceProvider::new(
350            catalog_manager,
351            true,
352            query_ctx.clone(),
353            MockDecoder::arc(),
354            true,
355        );
356
357        // View not found
358        let table_ref = TableReference::bare("not_exists_view");
359        assert!(table_provider.resolve_table(table_ref).await.is_err());
360
361        let table_ref = TableReference::bare(view_info.name);
362        let source = table_provider.resolve_table(table_ref).await.unwrap();
363        assert_eq!(
364            r#"
365Projection: person.id AS a, person.name AS b
366  Filter: person.id > Int32(500)
367    TableScan: person"#,
368            format!("\n{}", source.get_logical_plan().unwrap())
369        );
370    }
371}