1use std::collections::HashMap;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use common_catalog::format_full_table_name;
20use common_query::logical_plan::{rename_logical_plan_columns, SubstraitPlanDecoderRef};
21use datafusion::common::{ResolvedTableReference, TableReference};
22use datafusion::datasource::view::ViewTable;
23use datafusion::datasource::{provider_as_source, TableProvider};
24use datafusion::logical_expr::TableSource;
25use itertools::Itertools;
26use session::context::QueryContextRef;
27use snafu::{ensure, OptionExt, ResultExt};
28use table::metadata::TableType;
29use table::table::adapter::DfTableProviderAdapter;
30pub mod dummy_catalog;
31use dummy_catalog::DummyCatalogList;
32use table::TableRef;
33
34use crate::error::{
35 CastManagerSnafu, DatafusionSnafu, DecodePlanSnafu, GetViewCacheSnafu, ProjectViewColumnsSnafu,
36 QueryAccessDeniedSnafu, Result, TableNotExistSnafu, ViewInfoNotFoundSnafu,
37 ViewPlanColumnsChangedSnafu,
38};
39use crate::kvbackend::KvBackendCatalogManager;
40use crate::CatalogManagerRef;
41
42pub struct DfTableSourceProvider {
43 catalog_manager: CatalogManagerRef,
44 resolved_tables: HashMap<String, Arc<dyn TableSource>>,
45 disallow_cross_catalog_query: bool,
46 default_catalog: String,
47 default_schema: String,
48 query_ctx: QueryContextRef,
49 plan_decoder: SubstraitPlanDecoderRef,
50 enable_ident_normalization: bool,
51}
52
53impl DfTableSourceProvider {
54 pub fn new(
55 catalog_manager: CatalogManagerRef,
56 disallow_cross_catalog_query: bool,
57 query_ctx: QueryContextRef,
58 plan_decoder: SubstraitPlanDecoderRef,
59 enable_ident_normalization: bool,
60 ) -> Self {
61 Self {
62 catalog_manager,
63 disallow_cross_catalog_query,
64 resolved_tables: HashMap::new(),
65 default_catalog: query_ctx.current_catalog().to_owned(),
66 default_schema: query_ctx.current_schema(),
67 query_ctx,
68 plan_decoder,
69 enable_ident_normalization,
70 }
71 }
72
73 pub fn resolve_table_ref(&self, table_ref: TableReference) -> Result<ResolvedTableReference> {
74 if self.disallow_cross_catalog_query {
75 match &table_ref {
76 TableReference::Bare { .. } | TableReference::Partial { .. } => {}
77 TableReference::Full {
78 catalog, schema, ..
79 } => {
80 ensure!(
81 catalog.as_ref() == self.default_catalog,
82 QueryAccessDeniedSnafu {
83 catalog: catalog.as_ref(),
84 schema: schema.as_ref(),
85 }
86 );
87 }
88 };
89 }
90
91 Ok(table_ref.resolve(&self.default_catalog, &self.default_schema))
92 }
93
94 pub async fn resolve_table(
95 &mut self,
96 table_ref: TableReference,
97 ) -> Result<Arc<dyn TableSource>> {
98 let table_ref = self.resolve_table_ref(table_ref)?;
99
100 let resolved_name = table_ref.to_string();
101 if let Some(table) = self.resolved_tables.get(&resolved_name) {
102 return Ok(table.clone());
103 }
104
105 let catalog_name = table_ref.catalog.as_ref();
106 let schema_name = table_ref.schema.as_ref();
107 let table_name = table_ref.table.as_ref();
108
109 let table = self
110 .catalog_manager
111 .table(catalog_name, schema_name, table_name, Some(&self.query_ctx))
112 .await?
113 .with_context(|| TableNotExistSnafu {
114 table: format_full_table_name(catalog_name, schema_name, table_name),
115 })?;
116
117 let provider: Arc<dyn TableProvider> = if table.table_info().table_type == TableType::View {
118 self.create_view_provider(&table).await?
119 } else {
120 Arc::new(DfTableProviderAdapter::new(table))
121 };
122
123 let source = provider_as_source(provider);
124
125 let _ = self.resolved_tables.insert(resolved_name, source.clone());
126 Ok(source)
127 }
128
129 async fn create_view_provider(&self, table: &TableRef) -> Result<Arc<dyn TableProvider>> {
130 let catalog_manager = self
131 .catalog_manager
132 .as_any()
133 .downcast_ref::<KvBackendCatalogManager>()
134 .context(CastManagerSnafu)?;
135
136 let view_info = catalog_manager
137 .view_info_cache()?
138 .get(table.table_info().ident.table_id)
139 .await
140 .context(GetViewCacheSnafu)?
141 .context(ViewInfoNotFoundSnafu {
142 name: &table.table_info().name,
143 })?;
144
145 let catalog_list = Arc::new(DummyCatalogList::new(self.catalog_manager.clone()));
147 let logical_plan = self
148 .plan_decoder
149 .decode(Bytes::from(view_info.view_info.clone()), catalog_list, true)
150 .await
151 .context(DecodePlanSnafu {
152 name: &table.table_info().name,
153 })?;
154
155 let columns: Vec<_> = view_info.columns.iter().map(|c| c.as_str()).collect();
156
157 let original_plan_columns: Vec<_> =
158 view_info.plan_columns.iter().map(|c| c.as_str()).collect();
159
160 let plan_columns: Vec<_> = logical_plan
161 .schema()
162 .columns()
163 .into_iter()
164 .map(|c| c.name)
165 .collect();
166
167 ensure!(
172 original_plan_columns.len() == plan_columns.len(),
173 ViewPlanColumnsChangedSnafu {
174 origin_names: original_plan_columns.iter().join(","),
175 actual_names: plan_columns.iter().join(","),
176 }
177 );
178
179 let logical_plan = if !columns.is_empty() {
183 rename_logical_plan_columns(
184 self.enable_ident_normalization,
185 logical_plan,
186 plan_columns
187 .iter()
188 .map(|c| c.as_str())
189 .zip(columns.into_iter())
190 .collect(),
191 )
192 .context(ProjectViewColumnsSnafu)?
193 } else {
194 logical_plan
195 };
196
197 Ok(Arc::new(
198 ViewTable::try_new(logical_plan, Some(view_info.definition.to_string()))
199 .context(DatafusionSnafu)?,
200 ))
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use common_query::test_util::DummyDecoder;
207 use session::context::QueryContext;
208
209 use super::*;
210 use crate::kvbackend::KvBackendCatalogManagerBuilder;
211 use crate::memory::MemoryCatalogManager;
212
213 #[test]
214 fn test_validate_table_ref() {
215 let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
216
217 let table_provider = DfTableSourceProvider::new(
218 MemoryCatalogManager::with_default_setup(),
219 true,
220 query_ctx.clone(),
221 DummyDecoder::arc(),
222 true,
223 );
224
225 let table_ref = TableReference::bare("table_name");
226 let result = table_provider.resolve_table_ref(table_ref);
227 assert!(result.is_ok());
228
229 let table_ref = TableReference::partial("public", "table_name");
230 let result = table_provider.resolve_table_ref(table_ref);
231 assert!(result.is_ok());
232
233 let table_ref = TableReference::partial("wrong_schema", "table_name");
234 let result = table_provider.resolve_table_ref(table_ref);
235 assert!(result.is_ok());
236
237 let table_ref = TableReference::full("greptime", "public", "table_name");
238 let result = table_provider.resolve_table_ref(table_ref);
239 assert!(result.is_ok());
240
241 let table_ref = TableReference::full("wrong_catalog", "public", "table_name");
242 let result = table_provider.resolve_table_ref(table_ref);
243 assert!(result.is_err());
244
245 let table_ref = TableReference::partial("information_schema", "columns");
246 let result = table_provider.resolve_table_ref(table_ref);
247 assert!(result.is_ok());
248
249 let table_ref = TableReference::full("greptime", "information_schema", "columns");
250 assert!(table_provider.resolve_table_ref(table_ref).is_ok());
251
252 let table_ref = TableReference::full("dummy", "information_schema", "columns");
253 assert!(table_provider.resolve_table_ref(table_ref).is_err());
254
255 let table_ref = TableReference::full("greptime", "greptime_private", "columns");
256 assert!(table_provider.resolve_table_ref(table_ref).is_ok());
257 }
258
259 use std::collections::HashSet;
260
261 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
262 use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
263 use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
264 use common_meta::key::TableMetadataManager;
265 use common_meta::kv_backend::memory::MemoryKvBackend;
266 use common_query::error::Result as QueryResult;
267 use common_query::logical_plan::SubstraitPlanDecoder;
268 use datafusion::catalog::CatalogProviderList;
269 use datafusion::logical_expr::builder::LogicalTableSource;
270 use datafusion::logical_expr::{col, lit, LogicalPlan, LogicalPlanBuilder};
271
272 use crate::information_schema::NoopInformationExtension;
273
274 struct MockDecoder;
275 impl MockDecoder {
276 pub fn arc() -> Arc<Self> {
277 Arc::new(MockDecoder)
278 }
279 }
280
281 #[async_trait::async_trait]
282 impl SubstraitPlanDecoder for MockDecoder {
283 async fn decode(
284 &self,
285 _message: bytes::Bytes,
286 _catalog_list: Arc<dyn CatalogProviderList>,
287 _optimize: bool,
288 ) -> QueryResult<LogicalPlan> {
289 Ok(mock_plan())
290 }
291 }
292
293 fn mock_plan() -> LogicalPlan {
294 let schema = Schema::new(vec![
295 Field::new("id", DataType::Int32, true),
296 Field::new("name", DataType::Utf8, true),
297 ]);
298 let table_source = LogicalTableSource::new(SchemaRef::new(schema));
299
300 let projection = None;
301
302 let builder =
303 LogicalPlanBuilder::scan("person", Arc::new(table_source), projection).unwrap();
304
305 builder
306 .filter(col("id").gt(lit(500)))
307 .unwrap()
308 .build()
309 .unwrap()
310 }
311
312 #[tokio::test]
313 async fn test_resolve_view() {
314 let query_ctx = Arc::new(QueryContext::with("greptime", "public"));
315 let backend = Arc::new(MemoryKvBackend::default());
316 let layered_cache_builder = LayeredCacheRegistryBuilder::default()
317 .add_cache_registry(CacheRegistryBuilder::default().build());
318 let fundamental_cache_registry = build_fundamental_cache_registry(backend.clone());
319 let layered_cache_registry = Arc::new(
320 with_default_composite_cache_registry(
321 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
322 )
323 .unwrap()
324 .build(),
325 );
326
327 let catalog_manager = KvBackendCatalogManagerBuilder::new(
328 Arc::new(NoopInformationExtension),
329 backend.clone(),
330 layered_cache_registry,
331 )
332 .build();
333
334 let table_metadata_manager = TableMetadataManager::new(backend);
335 let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);
336 view_info.table_type = TableType::View;
337 let logical_plan = vec![1, 2, 3];
338 table_metadata_manager
340 .create_view_metadata(
341 view_info.clone().into(),
342 logical_plan,
343 HashSet::new(),
344 vec!["a".to_string(), "b".to_string()],
345 vec!["id".to_string(), "name".to_string()],
346 "definition".to_string(),
347 )
348 .await
349 .unwrap();
350
351 let mut table_provider = DfTableSourceProvider::new(
352 catalog_manager,
353 true,
354 query_ctx.clone(),
355 MockDecoder::arc(),
356 true,
357 );
358
359 let table_ref = TableReference::bare("not_exists_view");
361 assert!(table_provider.resolve_table(table_ref).await.is_err());
362
363 let table_ref = TableReference::bare(view_info.name);
364 let source = table_provider.resolve_table(table_ref).await.unwrap();
365 assert_eq!(
366 r#"
367Projection: person.id AS a, person.name AS b
368 Filter: person.id > Int32(500)
369 TableScan: person"#,
370 format!("\n{}", source.get_logical_plan().unwrap())
371 );
372 }
373}